diff --git a/Makefile b/Makefile index 63e93eb017..77256f3074 100644 --- a/Makefile +++ b/Makefile @@ -278,7 +278,7 @@ unittest-tests: check_test_label @echo "run unittest-tests" $(QUIET) $(ROOT_DIR)/tools/scripts/ginkgo.sh \ --cover --coverprofile=./coverage.out --covermode set \ - --json-report unittestreport.json --label-filter $(E2E_GINKGO_UTLABELS) \ + --json-report unittestreport.json \ -randomize-suites -randomize-all --keep-going --timeout=1h -p \ -vv -r $(ROOT_DIR)/pkg $(ROOT_DIR)/cmd $(QUIET) go tool cover -html=./coverage.out -o coverage-all.html diff --git a/cmd/coordinator/cmd/cni_types.go b/cmd/coordinator/cmd/cni_types.go index c5b431b5ba..dda783283c 100644 --- a/cmd/coordinator/cmd/cni_types.go +++ b/cmd/coordinator/cmd/cni_types.go @@ -269,8 +269,8 @@ func validateRPFilterConfig(rpfilter *int32, coordinatorConfig int64) (*int32, e func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) { if config == nil { return &DetectOptions{ - Interval: "1s", - TimeOut: "3s", + Interval: "10ms", + TimeOut: "100ms", Retry: 3, }, nil } @@ -284,7 +284,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) { } if config.TimeOut == "" { - config.TimeOut = "100ms" + config.TimeOut = "500ms" } _, err := time.ParseDuration(config.Interval) diff --git a/docs/concepts/coordinator-zh_CN.md b/docs/concepts/coordinator-zh_CN.md index 2c7b990543..abaa0cd44a 100644 --- a/docs/concepts/coordinator-zh_CN.md +++ b/docs/concepts/coordinator-zh_CN.md @@ -40,7 +40,7 @@ Spiderpool 内置一个叫 `coordinator` 的 CNI meta-plugin, 它在 Main CNI | hostRuleTable | 策略路由表号,同主机与 Pod 通信的路由将会存放于这个表号 | 整数型 | optional | 500 | | podRPFilter | 设置 Pod 的 sysctl 参数 rp_filter | 整数型 | optional | 0 | | hostRPFilter | (遗弃)设置节点 的 sysctl 参数 rp_filter | 整数型 | optional | 0 | -| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括重试次数(默认为 3 次), 探测间隔(默认为 10ms) 和 超时时间(默认为 100ms) | 对象类型 | optional | 空 | +| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括发送探测报文次数(retries: 默认为 3 次), 和响应的超时时间(timeout: 默认为 100ms),还有发送报文的间隔(interval:默认为 10ms, 将会在未来版本中移除) | 对象类型 | optional | 空 | | logOptions | 日志配置,包括 logLevel(默认为 debug) 和 logFile(默认为 /var/log/spidernet/coordinator.log) | 对象类型 | optional | - | > 如果您通过 `SpinderMultusConfig CR` 帮助创建 NetworkAttachmentDefinition CR,您可以在 `SpinderMultusConfig` 中配置 `coordinator` (所有字段)。参考: [SpinderMultusConfig](../reference/crd-spidermultusconfig.md)。 diff --git a/docs/concepts/coordinator.md b/docs/concepts/coordinator.md index 085bd62d06..1d68919ad5 100644 --- a/docs/concepts/coordinator.md +++ b/docs/concepts/coordinator.md @@ -41,7 +41,7 @@ Let's delve into how coordinator implements these features. | hostRuleTable | The routes on the host that communicates with the pod's underlay IPs will belong to this routing table number | int | optional | 500 | | podRPFilter | Set the rp_filter sysctl parameter on the pod, which is recommended to be set to 0 | int | optional | 0 | | hostRPFilter | (deprecated)Set the rp_filter sysctl parameter on the node, which is recommended to be set to 0 | int | optional | 0 | -| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including retry numbers(default is 3), interval(default is 10ms) and timeout(default is 100ms) | obejct | optional | nil | +| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including the number of the send packets(retries: default is 3) and the response timeout(timeout: default is 100ms) and the packet sending interval(interval: default is 10ms, which will be removed in the future version). | obejct | optional | nil | | logOptions | The configuration of logging, including logLevel(default is debug) and logFile(default is /var/log/spidernet/coordinator.log) | obejct | optional | nil | > You can configure `coordinator` by specifying all the relevant fields in `SpinderMultusConfig` if a NetworkAttachmentDefinition CR is created via `SpinderMultusConfig CR`. For more information, please refer to [SpinderMultusConfig](../reference/crd-spidermultusconfig.md). diff --git a/pkg/coordinatormanager/coordinator_informer.go b/pkg/coordinatormanager/coordinator_informer.go index 792422e4f8..18c92de1ad 100644 --- a/pkg/coordinatormanager/coordinator_informer.go +++ b/pkg/coordinatormanager/coordinator_informer.go @@ -8,7 +8,6 @@ import ( "fmt" "net" "reflect" - "regexp" "sort" "strings" "time" @@ -470,7 +469,7 @@ func (cc *CoordinatorController) updatePodAndServerCIDR(ctx context.Context, log if err == nil { logger.Sugar().Info("Trying to fetch the ClusterCIDR from kube-system/kubeadm-config") - k8sPodCIDR, k8sServiceCIDR, err = ExtractK8sCIDRFromKubeadmConfigMap(&cm) + k8sPodCIDR, k8sServiceCIDR, err = utils.ExtractK8sCIDRFromKubeadmConfigMap(&cm) if err == nil { // Success to get ClusterCIDR from kubeadm-config logger.Sugar().Infof("Success get CIDR from kubeadm-config: PodCIDR=%v, ServiceCIDR=%v", k8sPodCIDR, k8sServiceCIDR) @@ -504,7 +503,7 @@ func (cc *CoordinatorController) updatePodAndServerCIDR(ctx context.Context, log return coordCopy } - k8sPodCIDR, k8sServiceCIDR = ExtractK8sCIDRFromKCMPod(&podList.Items[0]) + k8sPodCIDR, k8sServiceCIDR = utils.ExtractK8sCIDRFromKCMPod(&podList.Items[0]) logger.Sugar().Infof("kube-controller-manager k8sPodCIDR %v, k8sServiceCIDR %v", k8sPodCIDR, k8sServiceCIDR) } @@ -781,90 +780,6 @@ func (cc *CoordinatorController) updateServiceCIDR(logger *zap.Logger, coordCopy return nil } -func ExtractK8sCIDRFromKubeadmConfigMap(cm *corev1.ConfigMap) ([]string, []string, error) { - if cm == nil { - return nil, nil, fmt.Errorf("kubeadm configmap is unexpected to nil") - } - var podCIDR, serviceCIDR []string - - clusterConfig, exists := cm.Data["ClusterConfiguration"] - if !exists { - return podCIDR, serviceCIDR, fmt.Errorf("unable to get kubeadm configmap ClusterConfiguration") - } - - podReg := regexp.MustCompile(`podSubnet:\s*(\S+)`) - serviceReg := regexp.MustCompile(`serviceSubnet:\s*(\S+)`) - - podSubnets := podReg.FindStringSubmatch(clusterConfig) - serviceSubnets := serviceReg.FindStringSubmatch(clusterConfig) - - if len(podSubnets) > 1 { - for _, cidr := range strings.Split(podSubnets[1], ",") { - cidr = strings.TrimSpace(cidr) - _, _, err := net.ParseCIDR(cidr) - if err != nil { - continue - } - podCIDR = append(podCIDR, cidr) - } - } - - if len(serviceSubnets) > 1 { - for _, cidr := range strings.Split(serviceSubnets[1], ",") { - cidr = strings.TrimSpace(cidr) - _, _, err := net.ParseCIDR(cidr) - if err != nil { - continue - } - serviceCIDR = append(serviceCIDR, cidr) - } - } - - return podCIDR, serviceCIDR, nil -} - -func ExtractK8sCIDRFromKCMPod(kcm *corev1.Pod) ([]string, []string) { - var podCIDR, serviceCIDR []string - - podReg := regexp.MustCompile(`--cluster-cidr=(.*)`) - serviceReg := regexp.MustCompile(`--service-cluster-ip-range=(.*)`) - - var podSubnets, serviceSubnets []string - for _, l := range kcm.Spec.Containers[0].Command { - if len(podSubnets) == 0 { - podSubnets = podReg.FindStringSubmatch(l) - } - if len(serviceSubnets) == 0 { - serviceSubnets = serviceReg.FindStringSubmatch(l) - } - if len(podSubnets) != 0 && len(serviceSubnets) != 0 { - break - } - } - - if len(podSubnets) != 0 { - for _, cidr := range strings.Split(podSubnets[1], ",") { - _, _, err := net.ParseCIDR(cidr) - if err != nil { - continue - } - podCIDR = append(podCIDR, cidr) - } - } - - if len(serviceSubnets) != 0 { - for _, cidr := range strings.Split(serviceSubnets[1], ",") { - _, _, err := net.ParseCIDR(cidr) - if err != nil { - continue - } - serviceCIDR = append(serviceCIDR, cidr) - } - } - - return podCIDR, serviceCIDR -} - func fetchType(cniDir string) (string, error) { defaultCniName, err := utils.GetDefaultCniName(cniDir) if err != nil { diff --git a/pkg/coordinatormanager/coordinator_informer_test.go b/pkg/coordinatormanager/coordinator_informer_test.go deleted file mode 100644 index 5dac9062a2..0000000000 --- a/pkg/coordinatormanager/coordinator_informer_test.go +++ /dev/null @@ -1,54 +0,0 @@ -// Copyright 2022 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - -package coordinatormanager - -import ( - "encoding/json" - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" - corev1 "k8s.io/api/core/v1" -) - -var _ = Describe("Coordinator Manager", Label("coordinatorinformer", "informer_test"), Serial, func() { - DescribeTable("should extract CIDRs correctly", - func(testName, cmStr string, expectedPodCIDR, expectedServiceCIDR []string, expectError bool) { - var cm corev1.ConfigMap - err := json.Unmarshal([]byte(cmStr), &cm) - Expect(err).NotTo(HaveOccurred(), "Failed to unmarshal configMap: %v\n", err) - - podCIDR, serviceCIDR, err := ExtractK8sCIDRFromKubeadmConfigMap(&cm) - - if expectError { - Expect(err).To(HaveOccurred(), "Expected an error but got none") - } else { - Expect(err).NotTo(HaveOccurred(), "Did not expect an error but got one: %v", err) - } - - Expect(podCIDR).To(Equal(expectedPodCIDR), "Pod CIDR does not match") - Expect(serviceCIDR).To(Equal(expectedServiceCIDR), "Service CIDR does not match") - }, - Entry("ClusterConfiguration", - "ClusterConfiguration", - clusterConfigurationJson, - []string{"192.168.165.0/24"}, - []string{"245.100.128.0/18"}, - false, - ), - Entry("No ClusterConfiguration", - "No ClusterConfiguration", - noClusterConfigurationJson, - nil, - nil, - true, - ), - Entry("No CIDR", - "No CIDR", - noCIDRJson, - nil, - nil, - false, - ), - ) - -}) diff --git a/pkg/coordinatormanager/coordinatormanager_suite_test.go b/pkg/coordinatormanager/coordinatormanager_suite_test.go deleted file mode 100644 index ff9687ab24..0000000000 --- a/pkg/coordinatormanager/coordinatormanager_suite_test.go +++ /dev/null @@ -1,61 +0,0 @@ -// Copyright 2022 Authors of spidernet-io -// SPDX-License-Identifier: Apache-2.0 - -package coordinatormanager - -import ( - "testing" - - . "github.com/onsi/ginkgo/v2" - . "github.com/onsi/gomega" -) - -var ( - clusterConfigurationJson string - noClusterConfigurationJson string - noCIDRJson string -) - -func TestCoordinatorManager(t *testing.T) { - RegisterFailHandler(Fail) - RunSpecs(t, "CoordinatorManager Suite") -} - -var _ = BeforeSuite(func() { - clusterConfigurationJson = ` - { - "apiVersion": "v1", - "data": { - "ClusterConfiguration": "networking:\n dnsDomain: cluster.local\n podSubnet: 192.168.165.0/24\n serviceSubnet: 245.100.128.0/18" - }, - "kind": "ConfigMap", - "metadata": { - "name": "kubeadm-config", - "namespace": "kube-system" - } - }` - noClusterConfigurationJson = ` - { - "apiVersion": "v1", - "data": { - "ClusterStatus": "apiEndpoints:\n anolios79:\n advertiseAddress: 192.168.165.128\n bindPort: 6443\napiVersion: kubeadm.k8s.io/v1beta2\nkind: ClusterStatus\n" - }, - "kind": "ConfigMap", - "metadata": { - "name": "kubeadm-config", - "namespace": "kube-system" - } - }` - noCIDRJson = ` - { - "apiVersion": "v1", - "data": { - "ClusterConfiguration": "clusterName: spider\ncontrolPlaneEndpoint: spider-control-plane:6443\ncontrollerManager:\n" - }, - "kind": "ConfigMap", - "metadata": { - "name": "kubeadm-config", - "namespace": "kube-system" - } - }` -}) diff --git a/pkg/errgroup/errgroup.go b/pkg/errgroup/errgroup.go index fdcb3846db..cb716eb207 100644 --- a/pkg/errgroup/errgroup.go +++ b/pkg/errgroup/errgroup.go @@ -11,6 +11,7 @@ package errgroup import ( + "context" "fmt" "runtime" "sync" @@ -43,6 +44,16 @@ func (g *Group) done() { g.wg.Done() } +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { diff --git a/pkg/ippoolmanager/config_test.go b/pkg/ippoolmanager/config_test.go new file mode 100644 index 0000000000..a457da38b0 --- /dev/null +++ b/pkg/ippoolmanager/config_test.go @@ -0,0 +1,77 @@ +// Copyright 2024 Authors of spidernet-io +// SPDX-License-Identifier: Apache-2.0 +package ippoolmanager + +import ( + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("IPPoolManagerConfig", Label("ippool_manager_test"), func() { + var config IPPoolManagerConfig + + Describe("setDefaultsForIPPoolManagerConfig", func() { + Context("when MaxAllocatedIPs is nil", func() { + BeforeEach(func() { + config = IPPoolManagerConfig{ + MaxAllocatedIPs: nil, + EnableKubevirtStaticIP: false, + } + }) + + It("should set MaxAllocatedIPs to default value", func() { + result := setDefaultsForIPPoolManagerConfig(config) + Expect(result.MaxAllocatedIPs).NotTo(BeNil()) + Expect(*result.MaxAllocatedIPs).To(Equal(defaultMaxAllocatedIPs)) + }) + }) + + Context("when MaxAllocatedIPs is set", func() { + BeforeEach(func() { + maxIPs := 3000 + config = IPPoolManagerConfig{ + MaxAllocatedIPs: &maxIPs, + EnableKubevirtStaticIP: true, + } + }) + + It("should retain the provided MaxAllocatedIPs value", func() { + result := setDefaultsForIPPoolManagerConfig(config) + Expect(result.MaxAllocatedIPs).NotTo(BeNil()) + Expect(*result.MaxAllocatedIPs).To(Equal(3000)) + }) + }) + + Context("when EnableKubevirtStaticIP is true", func() { + BeforeEach(func() { + config = IPPoolManagerConfig{ + MaxAllocatedIPs: nil, + EnableKubevirtStaticIP: true, + } + }) + + It("should set MaxAllocatedIPs to default value", func() { + result := setDefaultsForIPPoolManagerConfig(config) + Expect(result.MaxAllocatedIPs).NotTo(BeNil()) + Expect(*result.MaxAllocatedIPs).To(Equal(defaultMaxAllocatedIPs)) + Expect(result.EnableKubevirtStaticIP).To(BeTrue()) + }) + }) + + Context("when EnableKubevirtStaticIP is false", func() { + BeforeEach(func() { + config = IPPoolManagerConfig{ + MaxAllocatedIPs: nil, + EnableKubevirtStaticIP: false, + } + }) + + It("should set MaxAllocatedIPs to default value", func() { + result := setDefaultsForIPPoolManagerConfig(config) + Expect(result.MaxAllocatedIPs).NotTo(BeNil()) + Expect(*result.MaxAllocatedIPs).To(Equal(defaultMaxAllocatedIPs)) + Expect(result.EnableKubevirtStaticIP).To(BeFalse()) + }) + }) + }) +}) diff --git a/pkg/networking/gwconnection/connection.go b/pkg/networking/gwconnection/connection.go index 2dbe097fcb..6a81d5a183 100644 --- a/pkg/networking/gwconnection/connection.go +++ b/pkg/networking/gwconnection/connection.go @@ -72,37 +72,35 @@ func (dg *DetectGateway) ArpingOverIface() error { defer client.Close() gwNetIP := netip.MustParseAddr(dg.V4Gw.String()) - var gwHwAddr net.HardwareAddr - for i := 0; i < dg.retries; i++ { + if err = client.SetDeadline(time.Now().Add(dg.timeout)); err != nil { + dg.logger.Sugar().Errorf("failed to set deadline: %v", err) + return err + } - err = client.SetReadDeadline(time.Now().Add(dg.timeout)) + for i := 0; i < dg.retries; i++ { + dg.logger.Sugar().Debugf("[Retry: %v]try to send the arp request", i+1) + err := client.Request(gwNetIP) if err != nil { - dg.logger.Sugar().Errorf("[RetryNum: %v]failed to set ReadDeadline: %v", i+1, err) - time.Sleep(dg.interval) + dg.logger.Sugar().Errorf("[Retry: %v]failed to send the arp request: %v", i+1, err) continue } - dg.logger.Sugar().Debugf("[RetryNum: %v]try to arping the gateway", i+1) - gwHwAddr, err = client.Resolve(gwNetIP) + } + + // Loop and wait for replies + for { + res, _, err := client.Read() if err != nil { - dg.logger.Sugar().Errorf("[RetryNum: %v]failed to resolve: %v", i+1, err) - time.Sleep(dg.interval) - continue + dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err) + return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable) } - if gwHwAddr != nil { - dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String()) - return nil + if res.Operation != arp.OperationReply || res.SenderIP != gwNetIP { + continue } - time.Sleep(dg.interval) - } - - if neterr, ok := err.(net.Error); ok && neterr.Timeout() { - dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), err) - return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable) + dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, res.SenderHardwareAddr.String()) + return nil } - - return fmt.Errorf("failed to checking the gateway %s if is reachable: %w", dg.V4Gw.String(), err) } func (dg *DetectGateway) NDPingOverIface() error { @@ -127,17 +125,13 @@ func (dg *DetectGateway) NDPingOverIface() error { }, } - ticker := time.NewTicker(dg.interval) - defer ticker.Stop() - var gwHwAddr string for i := 0; i < dg.retries && gwHwAddr == ""; i++ { - <-ticker.C gwHwAddr, err = dg.sendReceive(client, msg) if err != nil { dg.logger.Sugar().Errorf("[retry number: %v]error detect if gateway is reachable: %v", i+1, err) } else if gwHwAddr != "" { - dg.logger.Sugar().Infof("gateway %s is reachable, it's located at %s", dg.V6Gw.String(), gwHwAddr) + dg.logger.Sugar().Infof("gateway %s is reachable, it is located at %s", dg.V6Gw.String(), gwHwAddr) return nil } } @@ -159,6 +153,11 @@ func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, e return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err) } + if err := client.SetDeadline(time.Now().Add(dg.timeout)); err != nil { + dg.logger.Error("[NDP]failed to set deadline", zap.Error(err)) + return "", fmt.Errorf("failed to set deadline: %v", err) + } + // we send a gratuitous neighbor solicitation to checking if ip is conflict err = client.WriteTo(m, nil, snm) if err != nil { @@ -166,11 +165,6 @@ func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, e return "", fmt.Errorf("failed to send message: %v", err) } - if err := client.SetReadDeadline(time.Now().Add(dg.timeout)); err != nil { - dg.logger.Error("[NDP]failed to set deadline", zap.Error(err)) - return "", fmt.Errorf("failed to set deadline: %v", err) - } - msg, _, _, err := client.ReadFrom() if err != nil { return "", err diff --git a/pkg/networking/ipchecking/ipchecking.go b/pkg/networking/ipchecking/ipchecking.go index eb7f91e691..c4053e8104 100644 --- a/pkg/networking/ipchecking/ipchecking.go +++ b/pkg/networking/ipchecking/ipchecking.go @@ -4,12 +4,10 @@ package ipchecking import ( - "context" "errors" "fmt" "net" "net/netip" - "runtime" "time" types100 "github.com/containernetworking/cni/pkg/types/100" @@ -17,6 +15,7 @@ import ( "github.com/mdlayher/arp" "github.com/mdlayher/ethernet" "github.com/mdlayher/ndp" + "github.com/spidernet-io/spiderpool/pkg/constant" "github.com/spidernet-io/spiderpool/pkg/errgroup" "go.uber.org/zap" ) @@ -59,7 +58,7 @@ func NewIPChecker(retries int, interval, timeout string, hostNs, netns ns.NetNS, } func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface string, errg *errgroup.Group) { - ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries)) + ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries), zap.String("timeout", ipc.timeout.String())) if len(ipconfigs) == 0 { ipc.logger.Info("No ips found in pod, ignore pod ip's conflict checking") return @@ -97,91 +96,50 @@ func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface } func (ipc *IPChecker) ipCheckingByARP() error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - defer ipc.arpClient.Close() - var conflictingMac string var err error - // start a goroutine to receive arp response - go func() { - runtime.LockOSThread() - - // switch to pod's netns - if e := ipc.netns.Set(); e != nil { - ipc.logger.Warn("Detect IP Conflict: failed to switch to pod's net namespace") + for i := 0; i < ipc.retries; i++ { + ipc.logger.Sugar().Debugf("[Retry: %v]try to arping the ip", i+1) + if err = ipc.arpClient.SetDeadline(time.Now().Add(ipc.timeout)); err != nil { + ipc.logger.Error("[ARP]failed to set deadline", zap.Error(err)) + continue } - defer func() { - err := ipc.hostNs.Set() // switch back - if err == nil { - // Unlock the current thread only when we successfully switched back - // to the original namespace; otherwise leave the thread locked which - // will force the runtime to scrap the current thread, that is maybe - // not as optimal but at least always safe to do. - runtime.UnlockOSThread() - } - }() - - var packet *arp.Packet - for { - select { - case <-ctx.Done(): - return - default: - packet, _, err = ipc.arpClient.Read() - if err != nil { - cancel() - return - } + // we send a gratuitous arp to checking if ip is conflict + // we use dad mode(duplicate address detection mode), so + // we set source ip to 0.0.0.0 + packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4) + if err != nil { + return err + } - if packet.Operation == arp.OperationReply { - // found reply and simple check if the reply packet is we want. - if packet.SenderIP.Compare(ipc.ip4) == 0 { - conflictingMac = packet.SenderHardwareAddr.String() - cancel() - return - } - } - } + err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast) + if err != nil { + ipc.logger.Error("[ARP]failed to send message", zap.Error(err)) + continue } - }() - // we send a gratuitous arp to checking if ip is conflict - // we use dad mode(duplicate address detection mode), so - // we set source ip to 0.0.0.0 - packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4) - if err != nil { - cancel() - return err - } + packet, _, err = ipc.arpClient.Read() + if err != nil { + ipc.logger.Error("[ARP]failed to receive message", zap.Error(err)) + continue + } - ticker := time.NewTicker(ipc.interval) - defer ticker.Stop() - - stop := false - for i := 0; i < ipc.retries && !stop; i++ { - select { - case <-ctx.Done(): - stop = true - case <-ticker.C: - err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast) - if err != nil { - stop = true - } + if packet.Operation != arp.OperationReply || packet.SenderIP.Compare(ipc.ip4) != 0 { + continue } - } - if err != nil { - return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err) + // found ip conflicting + ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", packet.SenderHardwareAddr.String())) + return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s", + constant.ErrIPConflict, ipc.ifi.Name, ipc.ip4.String(), ipc.ip4.String(), packet.SenderHardwareAddr.String()) } - if conflictingMac != "" { - // found ip conflicting - ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", conflictingMac)) - return fmt.Errorf("pod's interface %s with an conflicting ip %s, %s is located at %s", ipc.ifi.Name, - ipc.ip4.String(), ipc.ip4.String(), conflictingMac) + if err != nil { + if neterr, ok := err.(net.Error); ok && !neterr.Timeout() { + return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err) + } } ipc.logger.Debug("No ipv4 address conflict", zap.String("IPv4 address", ipc.ip4.String())) @@ -227,7 +185,9 @@ func (ipc *IPChecker) ipCheckingByNDP() error { func (ipc *IPChecker) sendReceiveLoop(msg ndp.Message) (string, error) { var hwAddr string var err error + for i := 0; i < ipc.retries; i++ { + ipc.logger.Sugar().Debugf("[Retry: %v]try to ndping the ip", i+1) hwAddr, err = ipc.sendReceive(msg) switch err { case errRetry: @@ -267,7 +227,7 @@ func (ipc *IPChecker) sendReceive(m ndp.Message) (string, error) { return "", fmt.Errorf("failed to send message: %v", err) } - if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.interval)); err != nil { + if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.timeout)); err != nil { ipc.logger.Error("[NDP]failed to set deadline", zap.Error(err)) return "", fmt.Errorf("failed to set deadline: %v", err) } diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index 1678a7e709..d60e459e99 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -5,10 +5,13 @@ package utils import ( "fmt" + "net" + "regexp" "sort" "strings" "github.com/containernetworking/cni/libcni" + corev1 "k8s.io/api/core/v1" ) // GetDefaultCNIConfPath according to the provided CNI file path (default is /etc/cni/net.d), @@ -68,3 +71,100 @@ func fetchCniNameFromPath(cniPath string) (string, error) { } return conf.Network.Name, nil } + +func ExtractK8sCIDRFromKubeadmConfigMap(cm *corev1.ConfigMap) ([]string, []string, error) { + if cm == nil { + return nil, nil, fmt.Errorf("kubeadm configmap is unexpected to nil") + } + var podCIDR, serviceCIDR []string + + clusterConfig, exists := cm.Data["ClusterConfiguration"] + if !exists { + return podCIDR, serviceCIDR, fmt.Errorf("unable to get kubeadm configmap ClusterConfiguration") + } + + podReg := regexp.MustCompile(`podSubnet:\s*(\S+)`) + serviceReg := regexp.MustCompile(`serviceSubnet:\s*(\S+)`) + + podSubnets := podReg.FindStringSubmatch(clusterConfig) + serviceSubnets := serviceReg.FindStringSubmatch(clusterConfig) + + if len(podSubnets) > 1 { + for _, cidr := range strings.Split(podSubnets[1], ",") { + cidr = strings.TrimSpace(cidr) + _, _, err := net.ParseCIDR(cidr) + if err != nil { + continue + } + podCIDR = append(podCIDR, cidr) + } + } + + if len(serviceSubnets) > 1 { + for _, cidr := range strings.Split(serviceSubnets[1], ",") { + cidr = strings.TrimSpace(cidr) + _, _, err := net.ParseCIDR(cidr) + if err != nil { + continue + } + serviceCIDR = append(serviceCIDR, cidr) + } + } + + return podCIDR, serviceCIDR, nil +} + +func ExtractK8sCIDRFromKCMPod(kcm *corev1.Pod) ([]string, []string) { + var podCIDR, serviceCIDR []string + + podReg := regexp.MustCompile(`--cluster-cidr=(.*)`) + serviceReg := regexp.MustCompile(`--service-cluster-ip-range=(.*)`) + + var podSubnets, serviceSubnets []string + findSubnets := func(l string) { + if len(podSubnets) == 0 { + podSubnets = podReg.FindStringSubmatch(l) + } + if len(serviceSubnets) == 0 { + serviceSubnets = serviceReg.FindStringSubmatch(l) + } + } + + for _, l := range kcm.Spec.Containers[0].Command { + findSubnets(l) + if len(podSubnets) != 0 && len(serviceSubnets) != 0 { + break + } + } + + if len(podSubnets) == 0 || len(serviceSubnets) == 0 { + for _, l := range kcm.Spec.Containers[0].Args { + findSubnets(l) + if len(podSubnets) != 0 && len(serviceSubnets) != 0 { + break + } + } + } + + if len(podSubnets) != 0 { + for _, cidr := range strings.Split(podSubnets[1], ",") { + _, _, err := net.ParseCIDR(cidr) + if err != nil { + continue + } + podCIDR = append(podCIDR, cidr) + } + } + + if len(serviceSubnets) != 0 { + for _, cidr := range strings.Split(serviceSubnets[1], ",") { + _, _, err := net.ParseCIDR(cidr) + if err != nil { + continue + } + serviceCIDR = append(serviceCIDR, cidr) + } + } + + return podCIDR, serviceCIDR +} diff --git a/pkg/utils/utils_suite_test.go b/pkg/utils/utils_suite_test.go new file mode 100644 index 0000000000..44f242e6d6 --- /dev/null +++ b/pkg/utils/utils_suite_test.go @@ -0,0 +1,15 @@ +// Copyright 2024 Authors of spidernet-io +// SPDX-License-Identifier: Apache-2.0 +package utils + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestUtils(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Utils Suite", Label("utils", "unittest")) +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go new file mode 100644 index 0000000000..c15ce5f0a1 --- /dev/null +++ b/pkg/utils/utils_test.go @@ -0,0 +1,204 @@ +// Copyright 2024 Authors of spidernet-io +// SPDX-License-Identifier: Apache-2.0 + +package utils + +import ( + "encoding/json" + "os" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + corev1 "k8s.io/api/core/v1" +) + +var _ = Describe("Utils", Label("utils"), Serial, func() { + var clusterConfigurationJson = `{ + "apiVersion": "v1", + "data": { + "ClusterConfiguration": "networking:\n dnsDomain: cluster.local\n podSubnet: 192.168.165.0/24\n serviceSubnet: 245.100.128.0/18" + }, + "kind": "ConfigMap", + "metadata": { + "name": "kubeadm-config", + "namespace": "kube-system" + } +}` + + var noClusterConfigurationJson = `{ + "apiVersion": "v1", + "data": { + "ClusterStatus": "apiEndpoints:\n anolios79:\n advertiseAddress: 192.168.165.128\n bindPort: 6443\napiVersion: kubeadm.k8s.io/v1beta2\nkind: ClusterStatus\n" + }, + "kind": "ConfigMap", + "metadata": { + "name": "kubeadm-config", + "namespace": "kube-system" + } +}` + var noCIDRJson = `{ + "apiVersion": "v1", + "data": { + "ClusterConfiguration": "clusterName: spider\ncontrolPlaneEndpoint: spider-control-plane:6443\ncontrollerManager:\n" + }, + "kind": "ConfigMap", + "metadata": { + "name": "kubeadm-config", + "namespace": "kube-system" + } +}` + DescribeTable("should extract CIDRs correctly", + func(testName, cmStr string, expectedPodCIDR, expectedServiceCIDR []string, expectError bool) { + var cm corev1.ConfigMap + err := json.Unmarshal([]byte(cmStr), &cm) + Expect(err).NotTo(HaveOccurred(), "Failed to unmarshal configMap: %v\n", err) + + podCIDR, serviceCIDR, err := ExtractK8sCIDRFromKubeadmConfigMap(&cm) + + if expectError { + Expect(err).To(HaveOccurred(), "Expected an error but got none") + } else { + Expect(err).NotTo(HaveOccurred(), "Did not expect an error but got one: %v", err) + } + + Expect(podCIDR).To(Equal(expectedPodCIDR), "Pod CIDR does not match") + Expect(serviceCIDR).To(Equal(expectedServiceCIDR), "Service CIDR does not match") + }, + Entry("ClusterConfiguration", + "ClusterConfiguration", + clusterConfigurationJson, + []string{"192.168.165.0/24"}, + []string{"245.100.128.0/18"}, + false, + ), + Entry("No ClusterConfiguration", + "No ClusterConfiguration", + noClusterConfigurationJson, + nil, + nil, + true, + ), + Entry("No CIDR", + "No CIDR", + noCIDRJson, + nil, + nil, + false, + ), + ) + + Context("GetDefaultCniName", func() { + var cniFile string + + It("Test GetDefaultCniName", func() { + cniFile = "10-calico.conflist" + tempDir := "/tmp" + "/" + cniFile + + err := os.WriteFile(tempDir, []byte(`{ + "name": "calico", + "cniVersion": "0.4.0", + "plugins": [ + { + "type": "calico", + "etcd_endpoints": "http://127.0.0.1:2379", + "log_level": "info", + "ipam": { + "type": "calico-ipam" + }, + "policy": { + "type": "k8s" + } + } + ] + }`), 0644) + Expect(err).NotTo(HaveOccurred()) + cniName, err := GetDefaultCniName("/tmp") + Expect(err).NotTo(HaveOccurred()) + + Expect(cniName).To(Equal("calico")) + // Expect(os.RemoveAll(tempCniDir)).NotTo(HaveOccurred()) + }) + }) + + Describe("ExtractK8sCIDRFromKCMPod", func() { + var ( + pod *corev1.Pod + ) + + BeforeEach(func() { + pod = &corev1.Pod{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Command: []string{ + "/path/to/kube-controller-manager", + "--cluster-cidr=192.168.0.0/16", + "--service-cluster-ip-range=10.96.0.0/12", + }, + Args: []string{ + "--cluster-cidr=192.168.0.0/16", + "--service-cluster-ip-range=10.96.0.0/12", + }, + }, + }, + }, + } + }) + + Context("when valid CIDR values are provided", func() { + It("should extract pod CIDR and service CIDR correctly", func() { + podCIDR, serviceCIDR := ExtractK8sCIDRFromKCMPod(pod) + Expect(podCIDR).To(ConsistOf("192.168.0.0/16")) + Expect(serviceCIDR).To(ConsistOf("10.96.0.0/12")) + }) + }) + + Context("when no CIDR values are provided", func() { + BeforeEach(func() { + pod.Spec.Containers[0].Command = []string{ + "/path/to/kube-controller-manager", + } + pod.Spec.Containers[0].Args = []string{} + }) + + It("should return empty slices for pod CIDR and service CIDR", func() { + podCIDR, serviceCIDR := ExtractK8sCIDRFromKCMPod(pod) + Expect(podCIDR).To(BeEmpty()) + Expect(serviceCIDR).To(BeEmpty()) + }) + }) + + Context("when invalid CIDR values are provided", func() { + BeforeEach(func() { + pod.Spec.Containers[0].Command = []string{ + "/path/to/kube-controller-manager", + "--cluster-cidr=invalidCIDR", + "--service-cluster-ip-range=alsoInvalidCIDR", + } + }) + + It("should return empty slices for pod CIDR and service CIDR", func() { + podCIDR, serviceCIDR := ExtractK8sCIDRFromKCMPod(pod) + Expect(podCIDR).To(BeEmpty()) + Expect(serviceCIDR).To(BeEmpty()) + }) + }) + + Context("when multiple CIDR values are provided", func() { + BeforeEach(func() { + pod.Spec.Containers[0].Command = []string{ + "/path/to/kube-controller-manager", + "--cluster-cidr=192.168.0.0/16,192.168.1.0/24", + "--service-cluster-ip-range=10.96.0.0/12,10.97.0.0/16", + } + }) + + It("should extract all valid pod CIDR and service CIDR", func() { + podCIDR, serviceCIDR := ExtractK8sCIDRFromKCMPod(pod) + Expect(podCIDR).To(ConsistOf("192.168.0.0/16", "192.168.1.0/24")) + Expect(serviceCIDR).To(ConsistOf("10.96.0.0/12", "10.97.0.0/16")) + }) + }) + }) + +}) diff --git a/test/e2e/spidercoordinator/spidercoordinator_test.go b/test/e2e/spidercoordinator/spidercoordinator_test.go index 44416aee67..62b7457d71 100644 --- a/test/e2e/spidercoordinator/spidercoordinator_test.go +++ b/test/e2e/spidercoordinator/spidercoordinator_test.go @@ -13,6 +13,7 @@ import ( "github.com/spidernet-io/spiderpool/pkg/coordinatormanager" "github.com/spidernet-io/spiderpool/pkg/ip" spiderpoolv2beta1 "github.com/spidernet-io/spiderpool/pkg/k8s/apis/spiderpool.spidernet.io/v2beta1" + "github.com/spidernet-io/spiderpool/pkg/utils" "github.com/spidernet-io/spiderpool/test/e2e/common" corev1 "k8s.io/api/core/v1" "k8s.io/utils/ptr" @@ -431,7 +432,7 @@ var _ = Describe("SpiderCoordinator", Label("spidercoordinator", "overlay"), Ser It("Prioritize getting ClusterCIDR from kubeadm-config", func() { GinkgoWriter.Printf("podCIDR and serviceCIDR from spidercoordinator: %v,%v\n", spc.Status.OverlayPodCIDR, spc.Status.ServiceCIDR) - podCIDR, serviceCIDr, err := coordinatormanager.ExtractK8sCIDRFromKubeadmConfigMap(cm) + podCIDR, serviceCIDr, err := utils.ExtractK8sCIDRFromKubeadmConfigMap(cm) Expect(err).NotTo(HaveOccurred(), "Failed to extract k8s CIDR from Kubeadm configMap, error is %v", err) GinkgoWriter.Printf("podCIDR and serviceCIDR from kubeadm-config : %v,%v\n", podCIDR, serviceCIDr) @@ -467,7 +468,7 @@ var _ = Describe("SpiderCoordinator", Label("spidercoordinator", "overlay"), Ser allPods, err := frame.GetPodList(client.MatchingLabels{"component": "kube-controller-manager"}) Expect(err).NotTo(HaveOccurred()) - kcmPodCIDR, kcmServiceCIDR := coordinatormanager.ExtractK8sCIDRFromKCMPod(&allPods.Items[0]) + kcmPodCIDR, kcmServiceCIDR := utils.ExtractK8sCIDRFromKCMPod(&allPods.Items[0]) GinkgoWriter.Printf("podCIDR and serviceCIDR from kube-controller-manager pod : %v,%v\n", kcmPodCIDR, kcmServiceCIDR) Eventually(func() bool {