From 9f45801e2506cb367b264551028919b9f3129cbc Mon Sep 17 00:00:00 2001 From: Cyclinder Kuo Date: Thu, 2 Jan 2025 11:28:58 +0800 Subject: [PATCH] coodirnator: set timeout for ip and gateway detection Signed-off-by: Cyclinder Kuo --- cmd/coordinator/cmd/cni_types.go | 6 +- docs/concepts/coordinator-zh_CN.md | 2 +- docs/concepts/coordinator.md | 2 +- pkg/errgroup/errgroup.go | 11 +++ pkg/networking/gwconnection/connection.go | 56 +++++------ pkg/networking/ipchecking/ipchecking.go | 112 +++++++--------------- 6 files changed, 77 insertions(+), 112 deletions(-) diff --git a/cmd/coordinator/cmd/cni_types.go b/cmd/coordinator/cmd/cni_types.go index c5b431b5ba..dda783283c 100644 --- a/cmd/coordinator/cmd/cni_types.go +++ b/cmd/coordinator/cmd/cni_types.go @@ -269,8 +269,8 @@ func validateRPFilterConfig(rpfilter *int32, coordinatorConfig int64) (*int32, e func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) { if config == nil { return &DetectOptions{ - Interval: "1s", - TimeOut: "3s", + Interval: "10ms", + TimeOut: "100ms", Retry: 3, }, nil } @@ -284,7 +284,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) { } if config.TimeOut == "" { - config.TimeOut = "100ms" + config.TimeOut = "500ms" } _, err := time.ParseDuration(config.Interval) diff --git a/docs/concepts/coordinator-zh_CN.md b/docs/concepts/coordinator-zh_CN.md index 2c7b990543..abaa0cd44a 100644 --- a/docs/concepts/coordinator-zh_CN.md +++ b/docs/concepts/coordinator-zh_CN.md @@ -40,7 +40,7 @@ Spiderpool 内置一个叫 `coordinator` 的 CNI meta-plugin, 它在 Main CNI | hostRuleTable | 策略路由表号,同主机与 Pod 通信的路由将会存放于这个表号 | 整数型 | optional | 500 | | podRPFilter | 设置 Pod 的 sysctl 参数 rp_filter | 整数型 | optional | 0 | | hostRPFilter | (遗弃)设置节点 的 sysctl 参数 rp_filter | 整数型 | optional | 0 | -| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括重试次数(默认为 3 次), 探测间隔(默认为 10ms) 和 超时时间(默认为 100ms) | 对象类型 | optional | 空 | +| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括发送探测报文次数(retries: 默认为 3 次), 和响应的超时时间(timeout: 默认为 100ms),还有发送报文的间隔(interval:默认为 10ms, 将会在未来版本中移除) | 对象类型 | optional | 空 | | logOptions | 日志配置,包括 logLevel(默认为 debug) 和 logFile(默认为 /var/log/spidernet/coordinator.log) | 对象类型 | optional | - | > 如果您通过 `SpinderMultusConfig CR` 帮助创建 NetworkAttachmentDefinition CR,您可以在 `SpinderMultusConfig` 中配置 `coordinator` (所有字段)。参考: [SpinderMultusConfig](../reference/crd-spidermultusconfig.md)。 diff --git a/docs/concepts/coordinator.md b/docs/concepts/coordinator.md index 085bd62d06..1d68919ad5 100644 --- a/docs/concepts/coordinator.md +++ b/docs/concepts/coordinator.md @@ -41,7 +41,7 @@ Let's delve into how coordinator implements these features. | hostRuleTable | The routes on the host that communicates with the pod's underlay IPs will belong to this routing table number | int | optional | 500 | | podRPFilter | Set the rp_filter sysctl parameter on the pod, which is recommended to be set to 0 | int | optional | 0 | | hostRPFilter | (deprecated)Set the rp_filter sysctl parameter on the node, which is recommended to be set to 0 | int | optional | 0 | -| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including retry numbers(default is 3), interval(default is 10ms) and timeout(default is 100ms) | obejct | optional | nil | +| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including the number of the send packets(retries: default is 3) and the response timeout(timeout: default is 100ms) and the packet sending interval(interval: default is 10ms, which will be removed in the future version). | obejct | optional | nil | | logOptions | The configuration of logging, including logLevel(default is debug) and logFile(default is /var/log/spidernet/coordinator.log) | obejct | optional | nil | > You can configure `coordinator` by specifying all the relevant fields in `SpinderMultusConfig` if a NetworkAttachmentDefinition CR is created via `SpinderMultusConfig CR`. For more information, please refer to [SpinderMultusConfig](../reference/crd-spidermultusconfig.md). diff --git a/pkg/errgroup/errgroup.go b/pkg/errgroup/errgroup.go index fdcb3846db..cb716eb207 100644 --- a/pkg/errgroup/errgroup.go +++ b/pkg/errgroup/errgroup.go @@ -11,6 +11,7 @@ package errgroup import ( + "context" "fmt" "runtime" "sync" @@ -43,6 +44,16 @@ func (g *Group) done() { g.wg.Done() } +// WithContext returns a new Group and an associated Context derived from ctx. +// +// The derived Context is canceled the first time a function passed to Go +// returns a non-nil error or the first time Wait returns, whichever occurs +// first. +func WithContext(ctx context.Context) (*Group, context.Context) { + ctx, cancel := context.WithCancelCause(ctx) + return &Group{cancel: cancel}, ctx +} + // Wait blocks until all function calls from the Go method have returned, then // returns the first non-nil error (if any) from them. func (g *Group) Wait() error { diff --git a/pkg/networking/gwconnection/connection.go b/pkg/networking/gwconnection/connection.go index 2dbe097fcb..6a81d5a183 100644 --- a/pkg/networking/gwconnection/connection.go +++ b/pkg/networking/gwconnection/connection.go @@ -72,37 +72,35 @@ func (dg *DetectGateway) ArpingOverIface() error { defer client.Close() gwNetIP := netip.MustParseAddr(dg.V4Gw.String()) - var gwHwAddr net.HardwareAddr - for i := 0; i < dg.retries; i++ { + if err = client.SetDeadline(time.Now().Add(dg.timeout)); err != nil { + dg.logger.Sugar().Errorf("failed to set deadline: %v", err) + return err + } - err = client.SetReadDeadline(time.Now().Add(dg.timeout)) + for i := 0; i < dg.retries; i++ { + dg.logger.Sugar().Debugf("[Retry: %v]try to send the arp request", i+1) + err := client.Request(gwNetIP) if err != nil { - dg.logger.Sugar().Errorf("[RetryNum: %v]failed to set ReadDeadline: %v", i+1, err) - time.Sleep(dg.interval) + dg.logger.Sugar().Errorf("[Retry: %v]failed to send the arp request: %v", i+1, err) continue } - dg.logger.Sugar().Debugf("[RetryNum: %v]try to arping the gateway", i+1) - gwHwAddr, err = client.Resolve(gwNetIP) + } + + // Loop and wait for replies + for { + res, _, err := client.Read() if err != nil { - dg.logger.Sugar().Errorf("[RetryNum: %v]failed to resolve: %v", i+1, err) - time.Sleep(dg.interval) - continue + dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err) + return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable) } - if gwHwAddr != nil { - dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String()) - return nil + if res.Operation != arp.OperationReply || res.SenderIP != gwNetIP { + continue } - time.Sleep(dg.interval) - } - - if neterr, ok := err.(net.Error); ok && neterr.Timeout() { - dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), err) - return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable) + dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, res.SenderHardwareAddr.String()) + return nil } - - return fmt.Errorf("failed to checking the gateway %s if is reachable: %w", dg.V4Gw.String(), err) } func (dg *DetectGateway) NDPingOverIface() error { @@ -127,17 +125,13 @@ func (dg *DetectGateway) NDPingOverIface() error { }, } - ticker := time.NewTicker(dg.interval) - defer ticker.Stop() - var gwHwAddr string for i := 0; i < dg.retries && gwHwAddr == ""; i++ { - <-ticker.C gwHwAddr, err = dg.sendReceive(client, msg) if err != nil { dg.logger.Sugar().Errorf("[retry number: %v]error detect if gateway is reachable: %v", i+1, err) } else if gwHwAddr != "" { - dg.logger.Sugar().Infof("gateway %s is reachable, it's located at %s", dg.V6Gw.String(), gwHwAddr) + dg.logger.Sugar().Infof("gateway %s is reachable, it is located at %s", dg.V6Gw.String(), gwHwAddr) return nil } } @@ -159,6 +153,11 @@ func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, e return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err) } + if err := client.SetDeadline(time.Now().Add(dg.timeout)); err != nil { + dg.logger.Error("[NDP]failed to set deadline", zap.Error(err)) + return "", fmt.Errorf("failed to set deadline: %v", err) + } + // we send a gratuitous neighbor solicitation to checking if ip is conflict err = client.WriteTo(m, nil, snm) if err != nil { @@ -166,11 +165,6 @@ func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, e return "", fmt.Errorf("failed to send message: %v", err) } - if err := client.SetReadDeadline(time.Now().Add(dg.timeout)); err != nil { - dg.logger.Error("[NDP]failed to set deadline", zap.Error(err)) - return "", fmt.Errorf("failed to set deadline: %v", err) - } - msg, _, _, err := client.ReadFrom() if err != nil { return "", err diff --git a/pkg/networking/ipchecking/ipchecking.go b/pkg/networking/ipchecking/ipchecking.go index eb7f91e691..c4053e8104 100644 --- a/pkg/networking/ipchecking/ipchecking.go +++ b/pkg/networking/ipchecking/ipchecking.go @@ -4,12 +4,10 @@ package ipchecking import ( - "context" "errors" "fmt" "net" "net/netip" - "runtime" "time" types100 "github.com/containernetworking/cni/pkg/types/100" @@ -17,6 +15,7 @@ import ( "github.com/mdlayher/arp" "github.com/mdlayher/ethernet" "github.com/mdlayher/ndp" + "github.com/spidernet-io/spiderpool/pkg/constant" "github.com/spidernet-io/spiderpool/pkg/errgroup" "go.uber.org/zap" ) @@ -59,7 +58,7 @@ func NewIPChecker(retries int, interval, timeout string, hostNs, netns ns.NetNS, } func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface string, errg *errgroup.Group) { - ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries)) + ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries), zap.String("timeout", ipc.timeout.String())) if len(ipconfigs) == 0 { ipc.logger.Info("No ips found in pod, ignore pod ip's conflict checking") return @@ -97,91 +96,50 @@ func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface } func (ipc *IPChecker) ipCheckingByARP() error { - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - defer ipc.arpClient.Close() - var conflictingMac string var err error - // start a goroutine to receive arp response - go func() { - runtime.LockOSThread() - - // switch to pod's netns - if e := ipc.netns.Set(); e != nil { - ipc.logger.Warn("Detect IP Conflict: failed to switch to pod's net namespace") + for i := 0; i < ipc.retries; i++ { + ipc.logger.Sugar().Debugf("[Retry: %v]try to arping the ip", i+1) + if err = ipc.arpClient.SetDeadline(time.Now().Add(ipc.timeout)); err != nil { + ipc.logger.Error("[ARP]failed to set deadline", zap.Error(err)) + continue } - defer func() { - err := ipc.hostNs.Set() // switch back - if err == nil { - // Unlock the current thread only when we successfully switched back - // to the original namespace; otherwise leave the thread locked which - // will force the runtime to scrap the current thread, that is maybe - // not as optimal but at least always safe to do. - runtime.UnlockOSThread() - } - }() - - var packet *arp.Packet - for { - select { - case <-ctx.Done(): - return - default: - packet, _, err = ipc.arpClient.Read() - if err != nil { - cancel() - return - } + // we send a gratuitous arp to checking if ip is conflict + // we use dad mode(duplicate address detection mode), so + // we set source ip to 0.0.0.0 + packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4) + if err != nil { + return err + } - if packet.Operation == arp.OperationReply { - // found reply and simple check if the reply packet is we want. - if packet.SenderIP.Compare(ipc.ip4) == 0 { - conflictingMac = packet.SenderHardwareAddr.String() - cancel() - return - } - } - } + err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast) + if err != nil { + ipc.logger.Error("[ARP]failed to send message", zap.Error(err)) + continue } - }() - // we send a gratuitous arp to checking if ip is conflict - // we use dad mode(duplicate address detection mode), so - // we set source ip to 0.0.0.0 - packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4) - if err != nil { - cancel() - return err - } + packet, _, err = ipc.arpClient.Read() + if err != nil { + ipc.logger.Error("[ARP]failed to receive message", zap.Error(err)) + continue + } - ticker := time.NewTicker(ipc.interval) - defer ticker.Stop() - - stop := false - for i := 0; i < ipc.retries && !stop; i++ { - select { - case <-ctx.Done(): - stop = true - case <-ticker.C: - err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast) - if err != nil { - stop = true - } + if packet.Operation != arp.OperationReply || packet.SenderIP.Compare(ipc.ip4) != 0 { + continue } - } - if err != nil { - return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err) + // found ip conflicting + ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", packet.SenderHardwareAddr.String())) + return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s", + constant.ErrIPConflict, ipc.ifi.Name, ipc.ip4.String(), ipc.ip4.String(), packet.SenderHardwareAddr.String()) } - if conflictingMac != "" { - // found ip conflicting - ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", conflictingMac)) - return fmt.Errorf("pod's interface %s with an conflicting ip %s, %s is located at %s", ipc.ifi.Name, - ipc.ip4.String(), ipc.ip4.String(), conflictingMac) + if err != nil { + if neterr, ok := err.(net.Error); ok && !neterr.Timeout() { + return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err) + } } ipc.logger.Debug("No ipv4 address conflict", zap.String("IPv4 address", ipc.ip4.String())) @@ -227,7 +185,9 @@ func (ipc *IPChecker) ipCheckingByNDP() error { func (ipc *IPChecker) sendReceiveLoop(msg ndp.Message) (string, error) { var hwAddr string var err error + for i := 0; i < ipc.retries; i++ { + ipc.logger.Sugar().Debugf("[Retry: %v]try to ndping the ip", i+1) hwAddr, err = ipc.sendReceive(msg) switch err { case errRetry: @@ -267,7 +227,7 @@ func (ipc *IPChecker) sendReceive(m ndp.Message) (string, error) { return "", fmt.Errorf("failed to send message: %v", err) } - if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.interval)); err != nil { + if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.timeout)); err != nil { ipc.logger.Error("[NDP]failed to set deadline", zap.Error(err)) return "", fmt.Errorf("failed to set deadline: %v", err) }