From 984b61c6d4e54ba3685d1383ce8ffa5111b2ce57 Mon Sep 17 00:00:00 2001 From: Peter Schuurman Date: Tue, 26 Nov 2024 15:17:46 -0800 Subject: [PATCH] Migrate metric defer() statements to gRPC metric interceptor. This allows for more accurate error code reporting if gRPC functionality is refactored --- cmd/gce-pd-csi-driver/main.go | 4 +- pkg/gce-pd-csi-driver/controller.go | 102 ++-------- pkg/gce-pd-csi-driver/gce-pd-driver.go | 5 +- pkg/gce-pd-csi-driver/gce-pd-driver_test.go | 11 +- pkg/gce-pd-csi-driver/server.go | 23 ++- pkg/gce-pd-csi-driver/server_test.go | 212 ++++++++++++++++++++ pkg/metrics/interceptor.go | 23 +++ pkg/metrics/metadata.go | 55 +++++ pkg/metrics/metrics.go | 25 +-- pkg/metrics/metrics_test.go | 17 +- pkg/metrics/metrics_test_util.go | 23 +++ test/sanity/sanity_test.go | 2 +- 12 files changed, 379 insertions(+), 123 deletions(-) create mode 100644 pkg/gce-pd-csi-driver/server_test.go create mode 100644 pkg/metrics/interceptor.go create mode 100644 pkg/metrics/metadata.go create mode 100644 pkg/metrics/metrics_test_util.go diff --git a/cmd/gce-pd-csi-driver/main.go b/cmd/gce-pd-csi-driver/main.go index 4513a82aa..5c6e5a83a 100644 --- a/cmd/gce-pd-csi-driver/main.go +++ b/cmd/gce-pd-csi-driver/main.go @@ -143,6 +143,7 @@ func handle() { }() } + var metricsManager *metrics.MetricsManager = nil if *runControllerService && *httpEndpoint != "" { mm := metrics.NewMetricsManager() mm.InitializeHttpHandler(*httpEndpoint, *metricsPath) @@ -151,6 +152,7 @@ func handle() { if metrics.IsGKEComponentVersionAvailable() { mm.EmitGKEComponentVersion() } + metricsManager = &mm } if len(*extraVolumeLabelsStr) > 0 && !*runControllerService { @@ -261,7 +263,7 @@ func handle() { gce.WaitForOpBackoff.Steps = *waitForOpBackoffSteps gce.WaitForOpBackoff.Cap = *waitForOpBackoffCap - gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing) + gceDriver.Run(*endpoint, *grpcLogCharCap, *enableOtelTracing, metricsManager) } func notEmpty(v string) bool { diff --git a/pkg/gce-pd-csi-driver/controller.go b/pkg/gce-pd-csi-driver/controller.go index 5a47e5da9..decac55a0 100644 --- a/pkg/gce-pd-csi-driver/controller.go +++ b/pkg/gce-pd-csi-driver/controller.go @@ -21,7 +21,6 @@ import ( "math/rand" neturl "net/url" "sort" - "strconv" "strings" "time" @@ -303,12 +302,14 @@ func (gceCS *GCEControllerServer) CreateVolume(ctx context.Context, req *csi.Cre func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req *csi.CreateVolumeRequest) (*csi.CreateVolumeResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("CreateVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() + // Apply Parameters (case-insensitive). We leave validation of + // the values to the cloud provider. + params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags) + metrics.UpdateRequestMetadataFromParams(ctx, params) + if err != nil { + return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) + } + // Validate arguments volumeCapabilities := req.GetVolumeCapabilities() capacityRange := req.GetCapacityRange() @@ -328,17 +329,6 @@ func (gceCS *GCEControllerServer) createVolumeInternal(ctx context.Context, req if err != nil { return nil, status.Errorf(codes.InvalidArgument, "VolumeCapabilities is invalid: %v", err.Error()) } - - // Apply Parameters (case-insensitive). We leave validation of - // the values to the cloud provider. - params, err := gceCS.parameterProcessor().ExtractAndDefaultParameters(req.GetParameters(), gceCS.Driver.extraVolumeLabels, gceCS.Driver.extraTags) - diskTypeForMetric = params.DiskType - enableConfidentialCompute = strconv.FormatBool(params.EnableConfidentialCompute) - hasStoragePools := len(params.StoragePools) > 0 - enableStoragePools = strconv.FormatBool(hasStoragePools) - if err != nil { - return nil, status.Errorf(codes.InvalidArgument, "failed to extract parameters: %v", err.Error()) - } // https://github.com/container-storage-interface/spec/blob/master/spec.md#createvolume // mutable_parameters MUST take precedence over the values from parameters. mutableParams := req.GetMutableParameters() @@ -782,14 +772,6 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re return nil, status.Error(codes.InvalidArgument, "volume ID must be provided") } - diskType := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("ControllerModifyVolume", err, diskType, enableConfidentialCompute, enableStoragePools) - }() - project, volKey, err := common.VolumeIDToKey(volumeID) if err != nil { // Cannot find volume associated with this ID because VolumeID is not in the correct format @@ -806,6 +788,7 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re klog.V(4).Infof("Modify Volume Parameters for %s: %v", volumeID, volumeModifyParams) existingDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionBeta) + metrics.UpdateRequestMetadataFromDisk(ctx, existingDisk) if err != nil { err = fmt.Errorf("Failed to get volume: %w", err) @@ -816,9 +799,9 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re err = status.Errorf(codes.Internal, "failed to get volume : %s", volumeID) return nil, err } - diskType = existingDisk.GetPDType() // Check if the disk supports dynamic IOPS/Throughput provisioning + diskType := existingDisk.GetPDType() supportsIopsChange := gceCS.diskSupportsIopsChange(diskType) supportsThroughputChange := gceCS.diskSupportsThroughputChange(diskType) if !supportsIopsChange && !supportsThroughputChange { @@ -834,8 +817,6 @@ func (gceCS *GCEControllerServer) ControllerModifyVolume(ctx context.Context, re return nil, err } - enableStoragePools = strconv.FormatBool(existingDisk.GetEnableStoragePools()) - err = gceCS.CloudProvider.UpdateDisk(ctx, project, volKey, existingDisk, volumeModifyParams) if err != nil { klog.Errorf("Failed to modify volume %s: %v", volumeID, err) @@ -883,12 +864,6 @@ func getGCEApiVersion(multiWriter bool) gce.GCEAPIVersion { func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) { // List disks with same name var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() existingZones := []string{gceCS.CloudProvider.GetDefaultZone()} zones, err := getDefaultZonesInRegion(ctx, gceCS, existingZones) if err != nil { @@ -910,7 +885,7 @@ func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req * } disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, zonalVolKey, gce.GCEAPIVersionV1) // TODO: Consolidate the parameters here, rather than taking the last. - diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk) + metrics.UpdateRequestMetadataFromDisk(ctx, disk) err := gceCS.CloudProvider.DeleteDisk(ctx, project, zonalVolKey) if err != nil { deleteDiskErrs = append(deleteDiskErrs, gceCS.CloudProvider.DeleteDisk(ctx, project, volKey)) @@ -927,12 +902,6 @@ func (gceCS *GCEControllerServer) deleteMultiZoneDisk(ctx context.Context, req * func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, req *csi.DeleteVolumeRequest, project string, volKey *meta.Key) (*csi.DeleteVolumeResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("DeleteVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() volumeID := req.GetVolumeId() project, volKey, err = gceCS.CloudProvider.RepairUnderspecifiedVolumeKey(ctx, project, volKey) if err != nil { @@ -948,7 +917,7 @@ func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, re } defer gceCS.volumeLocks.Release(volumeID) disk, _ := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk) + metrics.UpdateRequestMetadataFromDisk(ctx, disk) err = gceCS.CloudProvider.DeleteDisk(ctx, project, volKey) if err != nil { return nil, common.LoggedError("Failed to delete disk: ", err) @@ -960,12 +929,6 @@ func (gceCS *GCEControllerServer) deleteSingleDeviceDisk(ctx context.Context, re func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, req *csi.ControllerPublishVolumeRequest) (*csi.ControllerPublishVolumeResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("ControllerPublishVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() // Only valid requests will be accepted _, _, _, err = gceCS.validateControllerPublishVolumeRequest(ctx, req) if err != nil { @@ -978,7 +941,7 @@ func (gceCS *GCEControllerServer) ControllerPublishVolume(ctx context.Context, r } resp, err, disk := gceCS.executeControllerPublishVolume(ctx, req) - diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk) + metrics.UpdateRequestMetadataFromDisk(ctx, disk) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err) gceCS.errorBackoff.next(backoffId, common.CodeForError(err)) @@ -1192,12 +1155,6 @@ func (gceCS *GCEControllerServer) executeControllerPublishVolume(ctx context.Con func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, req *csi.ControllerUnpublishVolumeRequest) (*csi.ControllerUnpublishVolumeResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("ControllerUnpublishVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() _, _, err = gceCS.validateControllerUnpublishVolumeRequest(ctx, req) if err != nil { return nil, err @@ -1209,7 +1166,7 @@ func (gceCS *GCEControllerServer) ControllerUnpublishVolume(ctx context.Context, return nil, status.Errorf(gceCS.errorBackoff.code(backoffId), "ControllerUnpublish not permitted on node %q due to backoff condition", req.NodeId) } resp, err, disk := gceCS.executeControllerUnpublishVolume(ctx, req) - diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk) + metrics.UpdateRequestMetadataFromDisk(ctx, disk) if err != nil { klog.Infof("For node %s adding backoff due to error for volume %s: %v", req.NodeId, req.VolumeId, err) gceCS.errorBackoff.next(backoffId, common.CodeForError(err)) @@ -1316,12 +1273,6 @@ func (gceCS *GCEControllerServer) parameterProcessor() *common.ParameterProcesso func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context, req *csi.ValidateVolumeCapabilitiesRequest) (*csi.ValidateVolumeCapabilitiesResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("ValidateVolumeCapabilities", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() if req.GetVolumeCapabilities() == nil || len(req.GetVolumeCapabilities()) == 0 { return nil, status.Error(codes.InvalidArgument, "Volume Capabilities must be provided") } @@ -1348,7 +1299,7 @@ func (gceCS *GCEControllerServer) ValidateVolumeCapabilities(ctx context.Context defer gceCS.volumeLocks.Release(volumeID) disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk) + metrics.UpdateRequestMetadataFromDisk(ctx, disk) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "Could not find disk %v: %v", volKey.Name, err.Error()) @@ -1564,12 +1515,6 @@ func (gceCS *GCEControllerServer) ControllerGetCapabilities(ctx context.Context, func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.CreateSnapshotRequest) (*csi.CreateSnapshotResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("CreateSnapshot", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() // Validate arguments volumeID := req.GetSourceVolumeId() if len(req.Name) == 0 { @@ -1595,7 +1540,7 @@ func (gceCS *GCEControllerServer) CreateSnapshot(ctx context.Context, req *csi.C // Check if volume exists disk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(disk) + metrics.UpdateRequestMetadataFromDisk(ctx, disk) if err != nil { if gce.IsGCENotFoundError(err) { return nil, status.Errorf(codes.NotFound, "CreateSnapshot could not find disk %v: %v", volKey.String(), err.Error()) @@ -1823,12 +1768,6 @@ func isCSISnapshotReady(status string) (bool, error) { func (gceCS *GCEControllerServer) DeleteSnapshot(ctx context.Context, req *csi.DeleteSnapshotRequest) (*csi.DeleteSnapshotResponse, error) { var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("DeleteSnapshot", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() // Validate arguments snapshotID := req.GetSnapshotId() if len(snapshotID) == 0 { @@ -1913,14 +1852,7 @@ func (gceCS *GCEControllerServer) ListSnapshots(ctx context.Context, req *csi.Li } func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, req *csi.ControllerExpandVolumeRequest) (*csi.ControllerExpandVolumeResponse, error) { - var err error - diskTypeForMetric := metrics.DefaultDiskTypeForMetric - enableConfidentialCompute := metrics.DefaultEnableConfidentialCompute - enableStoragePools := metrics.DefaultEnableStoragePools - defer func() { - gceCS.Metrics.RecordOperationErrorMetrics("ControllerExpandVolume", err, diskTypeForMetric, enableConfidentialCompute, enableStoragePools) - }() volumeID := req.GetVolumeId() if len(volumeID) == 0 { return nil, status.Error(codes.InvalidArgument, "ControllerExpandVolume volume ID must be provided") @@ -1950,7 +1882,7 @@ func (gceCS *GCEControllerServer) ControllerExpandVolume(ctx context.Context, re } sourceDisk, err := gceCS.CloudProvider.GetDisk(ctx, project, volKey, gce.GCEAPIVersionV1) - diskTypeForMetric, enableConfidentialCompute, enableStoragePools = metrics.GetMetricParameters(sourceDisk) + metrics.UpdateRequestMetadataFromDisk(ctx, sourceDisk) resizedGb, err := gceCS.CloudProvider.ResizeDisk(ctx, project, volKey, reqBytes) if err != nil { diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver.go b/pkg/gce-pd-csi-driver/gce-pd-driver.go index b41ce6e3d..8bc26ae74 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver.go @@ -27,6 +27,7 @@ import ( "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/deviceutils" gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" metadataservice "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/metadata" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" mountmanager "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/mount-manager" ) @@ -170,12 +171,12 @@ func NewControllerServer(gceDriver *GCEDriver, cloudProvider gce.GCECompute, err } } -func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelTracing bool) { +func (gceDriver *GCEDriver) Run(endpoint string, grpcLogCharCap int, enableOtelTracing bool, metricsManager *metrics.MetricsManager) { maxLogChar = grpcLogCharCap klog.V(4).Infof("Driver: %v", gceDriver.name) //Start the nonblocking GRPC - s := NewNonBlockingGRPCServer(enableOtelTracing) + s := NewNonBlockingGRPCServer(enableOtelTracing, metricsManager) // TODO(#34): Only start specific servers based on a flag. // In the future have this only run specific combinations of servers depending on which version this is. // The schema for that was in util. basically it was just s.start but with some nil servers. diff --git a/pkg/gce-pd-csi-driver/gce-pd-driver_test.go b/pkg/gce-pd-csi-driver/gce-pd-driver_test.go index 7e8138207..628419526 100644 --- a/pkg/gce-pd-csi-driver/gce-pd-driver_test.go +++ b/pkg/gce-pd-csi-driver/gce-pd-driver_test.go @@ -41,8 +41,7 @@ func initBlockingGCEDriver(t *testing.T, cloudDisks []*gce.CloudDisk, readyToExe return initGCEDriverWithCloudProvider(t, fakeBlockingBlockProvider) } -func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver { - vendorVersion := "test-vendor" +func controllerServerForTest(cloudProvider gce.GCECompute) *GCEControllerServer { gceDriver := GetGCEDriver() errorBackoffInitialDuration := 200 * time.Millisecond errorBackoffMaxDuration := 5 * time.Minute @@ -55,7 +54,13 @@ func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) SupportsThroughputChange: []string{"hyperdisk-balanced", "hyperdisk-throughput", "hyperdisk-ml"}, } - controllerServer := NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig) + return NewControllerServer(gceDriver, cloudProvider, errorBackoffInitialDuration, errorBackoffMaxDuration, fallbackRequisiteZones, enableStoragePools, multiZoneVolumeHandleConfig, listVolumesConfig, provisionableDisksConfig) +} + +func initGCEDriverWithCloudProvider(t *testing.T, cloudProvider gce.GCECompute) *GCEDriver { + vendorVersion := "test-vendor" + gceDriver := GetGCEDriver() + controllerServer := controllerServerForTest(cloudProvider) err := gceDriver.SetupGCEDriver(driver, vendorVersion, nil, nil, nil, controllerServer, nil) if err != nil { t.Fatalf("Failed to setup GCE Driver: %v", err) diff --git a/pkg/gce-pd-csi-driver/server.go b/pkg/gce-pd-csi-driver/server.go index 8dd212e7b..573d66b4f 100644 --- a/pkg/gce-pd-csi-driver/server.go +++ b/pkg/gce-pd-csi-driver/server.go @@ -24,6 +24,7 @@ import ( "go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc" "google.golang.org/grpc" "k8s.io/klog/v2" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" csi "github.com/container-storage-interface/spec/lib/go/csi" ) @@ -40,15 +41,16 @@ type NonBlockingGRPCServer interface { ForceStop() } -func NewNonBlockingGRPCServer(enableOtelTracing bool) NonBlockingGRPCServer { - return &nonBlockingGRPCServer{otelTracing: enableOtelTracing} +func NewNonBlockingGRPCServer(enableOtelTracing bool, metricsManager *metrics.MetricsManager) NonBlockingGRPCServer { + return &nonBlockingGRPCServer{otelTracing: enableOtelTracing, metricsManager: metricsManager} } // NonBlocking server type nonBlockingGRPCServer struct { - wg sync.WaitGroup - server *grpc.Server - otelTracing bool + wg sync.WaitGroup + server *grpc.Server + otelTracing bool + metricsManager *metrics.MetricsManager } func (s *nonBlockingGRPCServer) Start(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { @@ -73,10 +75,17 @@ func (s *nonBlockingGRPCServer) ForceStop() { } func (s *nonBlockingGRPCServer) serve(endpoint string, ids csi.IdentityServer, cs csi.ControllerServer, ns csi.NodeServer) { - grpcInterceptor := grpc.UnaryInterceptor(logGRPC) + interceptors := []grpc.UnaryServerInterceptor{logGRPC} + if s.metricsManager != nil { + metricsInterceptor := metrics.MetricInterceptor{ + MetricsManager: s.metricsManager, + } + interceptors = append(interceptors, metricsInterceptor.UnaryInterceptor()) + } if s.otelTracing { - grpcInterceptor = grpc.ChainUnaryInterceptor(logGRPC, otelgrpc.UnaryServerInterceptor()) + interceptors = append(interceptors, otelgrpc.UnaryServerInterceptor()) } + grpcInterceptor := grpc.ChainUnaryInterceptor(interceptors...) opts := []grpc.ServerOption{ grpcInterceptor, diff --git a/pkg/gce-pd-csi-driver/server_test.go b/pkg/gce-pd-csi-driver/server_test.go new file mode 100644 index 000000000..117480574 --- /dev/null +++ b/pkg/gce-pd-csi-driver/server_test.go @@ -0,0 +1,212 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package gceGCEDriver + +import ( + "context" + "fmt" + "os" + "testing" + + csipb "github.com/container-storage-interface/spec/lib/go/csi" + "google.golang.org/grpc" + + csi "github.com/container-storage-interface/spec/lib/go/csi" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/metrics" +) + +func createSocketFile() (string, func(), error) { + tmpDir, err := os.MkdirTemp("", "socket-dir") + if err != nil { + return "", nil, fmt.Errorf("failed to create temporary socket directory: %v", err) + } + cleanup := func() { + os.RemoveAll(tmpDir) + } + socketFile := fmt.Sprintf("%s/test.sock", tmpDir) + return socketFile, cleanup, nil +} + +func createServerClient(mm *metrics.MetricsManager, socketFile string, seedDisks []*gce.CloudDisk) (*grpc.ClientConn, error) { + socketEndpoint := fmt.Sprintf("unix:%s", socketFile) + file, err := os.Create(socketFile) + if err != nil { + return nil, fmt.Errorf("failed to create temporary socket file: %v", err) + } + file.Close() + + metricsPath := "/metrics" + metricEndpoint := "localhost:0" // random port + mm.InitializeHttpHandler(metricEndpoint, metricsPath) + mm.RegisterPDCSIMetric() + + server := NewNonBlockingGRPCServer(false /* enableOtelTracing */, mm) + gceDriver := GetGCEDriver() + identityServer := NewIdentityServer(gceDriver) + fakeCloudProvider, err := gce.CreateFakeCloudProvider(project, zone, seedDisks) + if err != nil { + return nil, fmt.Errorf("failed to create fake cloud provider: %v", err) + } + controllerServer := controllerServerForTest(fakeCloudProvider) + if err := gceDriver.SetupGCEDriver(driver, "test-vendor", nil, nil, identityServer, controllerServer, nil); err != nil { + return nil, fmt.Errorf("failed to setup GCE Driver: %v", err) + } + + if err != nil { + return nil, fmt.Errorf("failed to create volume %v", err) + } + server.Start(socketEndpoint, gceDriver.ids, gceDriver.cs, gceDriver.ns) + + conn, err := grpc.Dial( + socketEndpoint, + grpc.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create client connection") + } + return conn, nil +} + +func TestServerCreateVolumeMetric(t *testing.T) { + mm := metrics.NewMetricsManager() + mm.ResetMetrics() + socketFile, cleanup, err := createSocketFile() + if err != nil { + t.Fatalf("Failed to create socket file: %v", err) + } + defer cleanup() + conn, err := createServerClient(&mm, socketFile, nil) + if err != nil { + t.Fatalf("Failed to create server client: %v", err) + } + controllerClient := csipb.NewControllerClient(conn) + req := &csi.CreateVolumeRequest{ + Name: name, + CapacityRange: stdCapRange, + VolumeCapabilities: stdVolCaps, + Parameters: map[string]string{ + common.ParameterKeyType: "pd-balanced", + }, + } + resp, err := controllerClient.CreateVolume(context.Background(), req) + + if err != nil { + t.Fatalf("CreateVolume returned unexpected error: %v", err) + } + + wantName := "projects/test-project/zones/country-region-zone/disks/test-name" + if resp.Volume.GetVolumeId() != wantName { + t.Fatalf("Response name expected: %v, got: %v", wantName, resp.Volume.GetVolumeId()) + } + + reg := mm.GetRegistry() + metrics, err := reg.Gather() + if err != nil { + t.Fatalf("Gailed to gather metrics: %v", err) + } + if len(metrics) != 1 { + t.Fatalf("Expected 1 metric, got %d", len(metrics)) + } + gotMetric := fmt.Sprint(metrics[0]) + wantMetric := `name:"csidriver_operation_errors" help:"[ALPHA] CSI server side error metrics" type:COUNTER metric: label: label: label: label: label: counter: > ` + if gotMetric != wantMetric { + t.Fatalf("Metric mismatch: \ngot: %v\nwant: %v", gotMetric, wantMetric) + } +} + +func TestServerValidateVolumeCapabilitiesMetric(t *testing.T) { + mm := metrics.NewMetricsManager() + mm.ResetMetrics() + seedDisks := []*gce.CloudDisk{ + createZonalCloudDisk(name), + } + socketFile, cleanup, err := createSocketFile() + if err != nil { + t.Fatalf("Failed to create socket file: %v", err) + } + defer cleanup() + conn, err := createServerClient(&mm, socketFile, seedDisks) + if err != nil { + t.Fatalf("Failed to create server client: %v", err) + } + controllerClient := csipb.NewControllerClient(conn) + req := &csi.ValidateVolumeCapabilitiesRequest{ + VolumeId: fmt.Sprintf("projects/%s/zones/%s/disks/%s", project, zone, name), + VolumeCapabilities: stdVolCaps, + } + resp, err := controllerClient.ValidateVolumeCapabilities(context.Background(), req) + + if err != nil { + t.Fatalf("CreateVolume returned unexpected error: %v", err) + } + + if resp.Confirmed != nil { + t.Fatalf("Expected not nil response, got: %v", resp) + } + + reg := mm.GetRegistry() + metrics, err := reg.Gather() + if err != nil { + t.Fatalf("Gailed to gather metrics: %v", err) + } + if len(metrics) != 1 { + t.Fatalf("Expected 1 metric, got %d", len(metrics)) + } + gotMetric := fmt.Sprint(metrics[0]) + wantMetric := `name:"csidriver_operation_errors" help:"[ALPHA] CSI server side error metrics" type:COUNTER metric: label: label: label: label: label: counter: > ` + if gotMetric != wantMetric { + t.Fatalf("Metric mismatch: \ngot: %v\nwant: %v", gotMetric, wantMetric) + } +} + +func TestServerGetPluginInfoMetric(t *testing.T) { + mm := metrics.NewMetricsManager() + mm.ResetMetrics() + socketFile, cleanup, err := createSocketFile() + if err != nil { + t.Fatalf("Failed to create socket file: %v", err) + } + defer cleanup() + conn, err := createServerClient(&mm, socketFile, nil) + if err != nil { + t.Fatalf("Failed to create server client: %v", err) + } + idClient := csipb.NewIdentityClient(conn) + resp, err := idClient.GetPluginInfo(context.Background(), &csi.GetPluginInfoRequest{}) + if err != nil { + t.Fatalf("GetPluginInfo returned unexpected error: %v", err) + } + + if resp.GetName() != driver { + t.Fatalf("Response name expected: %v, got: %v", driver, resp.GetName()) + } + + reg := mm.GetRegistry() + metrics, err := reg.Gather() + if err != nil { + t.Fatalf("Gailed to gather metrics: %v", err) + } + if len(metrics) != 1 { + t.Fatalf("Expected 1 metric, got %d", len(metrics)) + } + gotMetric := fmt.Sprint(metrics[0]) + wantMetric := `name:"csidriver_operation_errors" help:"[ALPHA] CSI server side error metrics" type:COUNTER metric: label: label: label: label: label: counter: > ` + if gotMetric != wantMetric { + t.Fatalf("Metric mismatch: \ngot: %v\nwant: %v", gotMetric, wantMetric) + } +} diff --git a/pkg/metrics/interceptor.go b/pkg/metrics/interceptor.go new file mode 100644 index 000000000..57269e713 --- /dev/null +++ b/pkg/metrics/interceptor.go @@ -0,0 +1,23 @@ +package metrics + +import ( + "context" + + "google.golang.org/grpc" +) + +type MetricInterceptor struct { + MetricsManager *MetricsManager +} + +func (m *MetricInterceptor) unaryInterceptorInternal(ctx context.Context, req any, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (any, error) { + requestMetadata := newRequestMetadata() + newCtx := context.WithValue(ctx, requestMetadataKey, requestMetadata) + result, err := handler(newCtx, req) + m.MetricsManager.RecordOperationErrorMetrics(info.FullMethod, err, requestMetadata.diskType, requestMetadata.enableConfidentialStorage, requestMetadata.enableStoragePools) + return result, err +} + +func (m *MetricInterceptor) UnaryInterceptor() grpc.UnaryServerInterceptor { + return m.unaryInterceptorInternal +} diff --git a/pkg/metrics/metadata.go b/pkg/metrics/metadata.go new file mode 100644 index 000000000..a2e41e81a --- /dev/null +++ b/pkg/metrics/metadata.go @@ -0,0 +1,55 @@ +package metrics + +import ( + "context" + "strconv" + + "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" + gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" +) + +const ( + // envGKEPDCSIVersion is an environment variable set in the PDCSI controller manifest + // with the current version of the GKE component. + requestMetadataKey = "requestMetadata" +) + +// RequestMetadata represents metadata about a gRPC CSI request +type RequestMetadata struct { + diskType string + enableConfidentialStorage string + enableStoragePools string +} + +func newRequestMetadata() *RequestMetadata { + return &RequestMetadata{ + diskType: DefaultDiskTypeForMetric, + enableConfidentialStorage: DefaultEnableConfidentialCompute, + enableStoragePools: DefaultEnableStoragePools, + } +} + +// MetadataFromContext returns a mutable from a request context +func MetadataFromContext(ctx context.Context) *RequestMetadata { + requestMetadata, _ := ctx.Value(requestMetadataKey).(*RequestMetadata) + return requestMetadata +} + +func UpdateRequestMetadataFromParams(ctx context.Context, params common.DiskParameters) { + metadata := MetadataFromContext(ctx) + if metadata != nil { + metadata.diskType = params.DiskType + metadata.enableConfidentialStorage = strconv.FormatBool(params.EnableConfidentialCompute) + hasStoragePools := len(params.StoragePools) > 0 + metadata.enableStoragePools = strconv.FormatBool(hasStoragePools) + } +} + +func UpdateRequestMetadataFromDisk(ctx context.Context, disk *gce.CloudDisk) { + metadata := MetadataFromContext(ctx) + if metadata != nil && disk != nil { + metadata.diskType = disk.GetPDType() + metadata.enableConfidentialStorage = strconv.FormatBool(disk.GetEnableConfidentialCompute()) + metadata.enableStoragePools = strconv.FormatBool(disk.GetEnableStoragePools()) + } +} diff --git a/pkg/metrics/metrics.go b/pkg/metrics/metrics.go index 8a998f089..52955fcbb 100644 --- a/pkg/metrics/metrics.go +++ b/pkg/metrics/metrics.go @@ -21,13 +21,11 @@ import ( "fmt" "net/http" "os" - "strconv" "google.golang.org/grpc/codes" "k8s.io/component-base/metrics" "k8s.io/klog/v2" "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/common" - gce "sigs.k8s.io/gcp-compute-persistent-disk-csi-driver/pkg/gce-cloud-provider/compute" ) const ( @@ -41,6 +39,11 @@ const ( ) var ( + gkeComponentVersion *metrics.GaugeVec + pdcsiOperationErrorsMetric *metrics.CounterVec +) + +func initMetrics() { // This metric is exposed only from the controller driver component when GKE_PDCSI_VERSION env variable is set. gkeComponentVersion = metrics.NewGaugeVec(&metrics.GaugeOpts{ Name: "component_version", @@ -55,7 +58,7 @@ var ( StabilityLevel: metrics.ALPHA, }, []string{"driver_name", "method_name", "grpc_status_code", "disk_type", "enable_confidential_storage", "enable_storage_pools"}) -) +} type MetricsManager struct { registry metrics.KubeRegistry @@ -93,13 +96,13 @@ func (mm *MetricsManager) recordComponentVersionMetric() error { } func (mm *MetricsManager) RecordOperationErrorMetrics( - operationName string, + fullMethodName string, operationErr error, diskType string, enableConfidentialStorage string, enableStoragePools string) { errCode := errorCodeLabelValue(operationErr) - pdcsiOperationErrorsMetric.WithLabelValues(pdcsiDriverName, "/csi.v1.Controller/"+operationName, errCode, diskType, enableConfidentialStorage, enableStoragePools).Inc() + pdcsiOperationErrorsMetric.WithLabelValues(pdcsiDriverName, fullMethodName, errCode, diskType, enableConfidentialStorage, enableStoragePools).Inc() klog.Infof("Recorded PDCSI operation error code: %q", errCode) } @@ -157,18 +160,6 @@ func IsGKEComponentVersionAvailable() bool { return true } -func GetMetricParameters(disk *gce.CloudDisk) (string, string, string) { - diskType := DefaultDiskTypeForMetric - enableConfidentialStorage := DefaultEnableConfidentialCompute - enableStoragePools := DefaultEnableStoragePools - if disk != nil { - diskType = disk.GetPDType() - enableConfidentialStorage = strconv.FormatBool(disk.GetEnableConfidentialCompute()) - enableStoragePools = strconv.FormatBool(disk.GetEnableStoragePools()) - } - return diskType, enableConfidentialStorage, enableStoragePools -} - // errorCodeLabelValue returns the label value for the given operation error. // This was separated into a helper function for unit testing purposes. func errorCodeLabelValue(operationErr error) string { diff --git a/pkg/metrics/metrics_test.go b/pkg/metrics/metrics_test.go index 62499a575..8ef80cac3 100644 --- a/pkg/metrics/metrics_test.go +++ b/pkg/metrics/metrics_test.go @@ -99,15 +99,18 @@ func TestGetMetricParameters(t *testing.T) { for _, tc := range testCases { t.Logf("Running test: %v", tc.name) - diskType, confidentialCompute, enableStoragePools := GetMetricParameters(tc.disk) - if confidentialCompute != tc.expectedEnableConfidentialCompute { - t.Fatalf("Got confidentialCompute value %q expected %q", confidentialCompute, tc.expectedEnableConfidentialCompute) + ctx := context.TODO() + requestMetadata := newRequestMetadata() + newCtx := context.WithValue(ctx, requestMetadataKey, requestMetadata) + UpdateRequestMetadataFromDisk(newCtx, tc.disk) + if requestMetadata.enableConfidentialStorage != tc.expectedEnableConfidentialCompute { + t.Fatalf("Got confidentialCompute value %q expected %q", requestMetadata.enableConfidentialStorage, tc.expectedEnableConfidentialCompute) } - if diskType != tc.expectedDiskType { - t.Fatalf("Got diskType value %q expected %q", diskType, tc.expectedDiskType) + if requestMetadata.diskType != tc.expectedDiskType { + t.Fatalf("Got diskType value %q expected %q", requestMetadata.enableConfidentialStorage, tc.expectedDiskType) } - if enableStoragePools != tc.expectedEnableStoragePools { - t.Fatalf("Got enableStoragePools value %q expected %q", enableStoragePools, tc.expectedEnableStoragePools) + if requestMetadata.enableStoragePools != tc.expectedEnableStoragePools { + t.Fatalf("Got enableStoragePools value %q expected %q", requestMetadata.enableStoragePools, tc.expectedEnableStoragePools) } } } diff --git a/pkg/metrics/metrics_test_util.go b/pkg/metrics/metrics_test_util.go new file mode 100644 index 000000000..c77142413 --- /dev/null +++ b/pkg/metrics/metrics_test_util.go @@ -0,0 +1,23 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package metrics + +// Test-only method used for resetting metric counts. +func (mm *MetricsManager) ResetMetrics() { + // Re-initialize metrics + initMetrics() +} diff --git a/test/sanity/sanity_test.go b/test/sanity/sanity_test.go index 8086eb251..c6a2dad22 100644 --- a/test/sanity/sanity_test.go +++ b/test/sanity/sanity_test.go @@ -102,7 +102,7 @@ func TestSanity(t *testing.T) { }() go func() { - gceDriver.Run(endpoint, 10000, false) + gceDriver.Run(endpoint, 10000, false /* enableOtelTracing */, nil /* metricsManager */) }() // TODO(#818): Fix failing tests and remove test skip flag.