Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

coodirnator: optimize the detectiong timeout for ip conflict and gateway detection #4510

Merged
merged 1 commit into from
Jan 3, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions cmd/coordinator/cmd/cni_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ func validateRPFilterConfig(rpfilter *int32, coordinatorConfig int64) (*int32, e
func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
if config == nil {
return &DetectOptions{
Interval: "1s",
TimeOut: "3s",
Interval: "10ms",
TimeOut: "100ms",
Retry: 3,
}, nil
}
Expand All @@ -289,7 +289,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
}

if config.TimeOut == "" {
config.TimeOut = "100ms"
config.TimeOut = "500ms"
}

_, err := time.ParseDuration(config.Interval)
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator-zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ EOF
| podRPFilter | 设置 Pod 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| hostRPFilter | (遗弃)设置节点 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| txQueueLen | 设置 Pod 的网卡传输队列 | 整数型 | optional | 0 |
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括重试次数(默认为 3 次), 探测间隔(默认为 10ms) 和 超时时间(默认为 100ms) | 对象类型 | optional | 空 |
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括发送探测报文次数(retries: 默认为 3 次), 和响应的超时时间(timeout: 默认为 100ms),还有发送报文的间隔(interval:默认为 10ms, 将会在未来版本中移除) | 对象类型 | optional | 空 |
| logOptions | 日志配置,包括 logLevel(默认为 debug) 和 logFile(默认为 /var/log/spidernet/coordinator.log) | 对象类型 | optional | - |

> 如果您通过 `SpinderMultusConfig CR` 帮助创建 NetworkAttachmentDefinition CR,您可以在 `SpinderMultusConfig` 中配置 `coordinator` (所有字段)。参考: [SpinderMultusConfig](../reference/crd-spidermultusconfig.md)。
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Let's delve into how coordinator implements these features.
| podRPFilter | Set the rp_filter sysctl parameter on the pod, which is recommended to be set to 0 | int | optional | 0 |
| hostRPFilter | (deprecated)Set the rp_filter sysctl parameter on the node, which is recommended to be set to 0 | int | optional | 0 |
| txQueueLen | set txqueuelen(Transmit Queue Length) of the pod's interface | int | optional | 0 |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including retry numbers(default is 3), interval(default is 10ms) and timeout(default is 100ms) | obejct | optional | nil |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including the number of the send packets(retries: default is 3) and the response timeout(timeout: default is 100ms) and the packet sending interval(interval: default is 10ms, which will be removed in the future version). | obejct | optional | nil |
| logOptions | The configuration of logging, including logLevel(default is debug) and logFile(default is /var/log/spidernet/coordinator.log) | obejct | optional | nil |

> You can configure `coordinator` by specifying all the relevant fields in `SpinderMultusConfig` if a NetworkAttachmentDefinition CR is created via `SpinderMultusConfig CR`. For more information, please refer to [SpinderMultusConfig](../reference/crd-spidermultusconfig.md).
Expand Down
11 changes: 11 additions & 0 deletions pkg/errgroup/errgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package errgroup

import (
"context"
"fmt"
"runtime"
"sync"
Expand Down Expand Up @@ -43,6 +44,16 @@ func (g *Group) done() {
g.wg.Done()
}

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
Expand Down
53 changes: 26 additions & 27 deletions pkg/networking/gwconnection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,33 +72,36 @@ func (dg *DetectGateway) ArpingOverIface() error {
defer client.Close()

gwNetIP := netip.MustParseAddr(dg.V4Gw.String())
var gwHwAddr net.HardwareAddr
for i := 0; i < dg.retries; i++ {
if err = client.SetDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Sugar().Errorf("failed to set deadline: %v", err)
return err
}

err = client.SetReadDeadline(time.Now().Add(dg.timeout))
for i := 0; i < dg.retries; i++ {
dg.logger.Sugar().Debugf("[Retry: %v]try to send the arp request", i+1)
err := client.Request(gwNetIP)
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to set ReadDeadline: %v", i+1, err)
time.Sleep(dg.interval)
dg.logger.Sugar().Errorf("[Retry: %v]failed to send the arp request: %v", i+1, err)
continue
}

dg.logger.Sugar().Debugf("[RetryNum: %v]try to arping the gateway", i+1)
gwHwAddr, err = client.Resolve(gwNetIP)
}

// Loop and wait for replies
for {
res, _, err := client.Read()
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to resolve: %v", i+1, err)
time.Sleep(dg.interval)
continue
dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err)
return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable)
}

if gwHwAddr != nil {
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String())
return nil
if res.Operation != arp.OperationReply || res.SenderIP != gwNetIP {
continue
}
time.Sleep(dg.interval)
}

dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err)
return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable)
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, res.SenderHardwareAddr.String())
return nil
}
}

func (dg *DetectGateway) NDPingOverIface() error {
Expand All @@ -123,17 +126,13 @@ func (dg *DetectGateway) NDPingOverIface() error {
},
}

ticker := time.NewTicker(dg.interval)
defer ticker.Stop()

var gwHwAddr string
for i := 0; i < dg.retries && gwHwAddr == ""; i++ {
<-ticker.C
gwHwAddr, err = dg.sendReceive(client, msg)
if err != nil {
dg.logger.Sugar().Errorf("[retry number: %v]error detect if gateway is reachable: %v", i+1, err)
} else if gwHwAddr != "" {
dg.logger.Sugar().Infof("gateway %s is reachable, it's located at %s", dg.V6Gw.String(), gwHwAddr)
dg.logger.Sugar().Infof("gateway %s is reachable, it is located at %s", dg.V6Gw.String(), gwHwAddr)
return nil
}
}
Expand All @@ -152,18 +151,18 @@ func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, e
return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err)
}

if err := client.SetDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}

// we send a gratuitous neighbor solicitation to checking if ip is conflict
err = client.WriteTo(m, nil, snm)
if err != nil {
dg.logger.Error("[NDP]failed to send message", zap.Error(err))
return "", fmt.Errorf("failed to send message: %v", err)
}

if err := client.SetReadDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}

msg, _, _, err := client.ReadFrom()
if err != nil {
return "", err
Expand Down
111 changes: 35 additions & 76 deletions pkg/networking/ipchecking/ipchecking.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
package ipchecking

import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"runtime"
"time"

types100 "github.com/containernetworking/cni/pkg/types/100"
Expand Down Expand Up @@ -61,7 +59,7 @@ func NewIPChecker(retries int, interval, timeout string, hostNs, netns ns.NetNS,
}

func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface string, errg *errgroup.Group) {
ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries))
ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries), zap.String("timeout", ipc.timeout.String()))
if len(ipconfigs) == 0 {
ipc.logger.Info("No ips found in pod, ignore pod ip's conflict checking")
return
Expand Down Expand Up @@ -99,91 +97,50 @@ func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface
}

func (ipc *IPChecker) ipCheckingByARP() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer ipc.arpClient.Close()

var conflictingMac string
var err error
// start a goroutine to receive arp response
go func() {
runtime.LockOSThread()

// switch to pod's netns
if e := ipc.netns.Set(); e != nil {
ipc.logger.Warn("Detect IP Conflict: failed to switch to pod's net namespace")
for i := 0; i < ipc.retries; i++ {
ipc.logger.Sugar().Debugf("[Retry: %v]try to arping the ip", i+1)
if err = ipc.arpClient.SetDeadline(time.Now().Add(ipc.timeout)); err != nil {
ipc.logger.Error("[ARP]failed to set deadline", zap.Error(err))
continue
}

defer func() {
err := ipc.hostNs.Set() // switch back
if err == nil {
// Unlock the current thread only when we successfully switched back
// to the original namespace; otherwise leave the thread locked which
// will force the runtime to scrap the current thread, that is maybe
// not as optimal but at least always safe to do.
runtime.UnlockOSThread()
}
}()

var packet *arp.Packet
for {
select {
case <-ctx.Done():
return
default:
packet, _, err = ipc.arpClient.Read()
if err != nil {
cancel()
return
}

if packet.Operation == arp.OperationReply {
// found reply and simple check if the reply packet is we want.
if packet.SenderIP.Compare(ipc.ip4) == 0 {
conflictingMac = packet.SenderHardwareAddr.String()
cancel()
return
}
}
}
// we send a gratuitous arp to checking if ip is conflict
// we use dad mode(duplicate address detection mode), so
// we set source ip to 0.0.0.0
packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4)
if err != nil {
return err
}
}()

// we send a gratuitous arp to checking if ip is conflict
// we use dad mode(duplicate address detection mode), so
// we set source ip to 0.0.0.0
packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4)
if err != nil {
cancel()
return err
}

ticker := time.NewTicker(ipc.interval)
defer ticker.Stop()
err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast)
if err != nil {
ipc.logger.Error("[ARP]failed to send message", zap.Error(err))
continue
}

END:
for i := 0; i < ipc.retries; i++ {
select {
case <-ctx.Done():
break END
case <-ticker.C:
err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast)
if err != nil {
break END
}
packet, _, err = ipc.arpClient.Read()
if err != nil {
ipc.logger.Error("[ARP]failed to receive message", zap.Error(err))
continue
}
}

if err != nil {
return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err)
}
if packet.Operation != arp.OperationReply || packet.SenderIP.Compare(ipc.ip4) != 0 {
continue
}

if conflictingMac != "" {
// found ip conflicting
ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", conflictingMac))
ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", packet.SenderHardwareAddr.String()))
return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s",
constant.ErrIPConflict, ipc.ifi.Name, ipc.ip4.String(), ipc.ip4.String(), conflictingMac)
constant.ErrIPConflict, ipc.ifi.Name, ipc.ip4.String(), ipc.ip4.String(), packet.SenderHardwareAddr.String())
}

if err != nil {
if neterr, ok := err.(net.Error); ok && !neterr.Timeout() {
return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err)
}
}

ipc.logger.Debug("No ipv4 address conflict", zap.String("IPv4 address", ipc.ip4.String()))
Expand Down Expand Up @@ -229,7 +186,9 @@ func (ipc *IPChecker) ipCheckingByNDP() error {
func (ipc *IPChecker) sendReceiveLoop(msg ndp.Message) (string, error) {
var hwAddr string
var err error

for i := 0; i < ipc.retries; i++ {
ipc.logger.Sugar().Debugf("[Retry: %v]try to ndping the ip", i+1)
hwAddr, err = ipc.sendReceive(msg)
switch err {
case errRetry:
Expand Down Expand Up @@ -269,7 +228,7 @@ func (ipc *IPChecker) sendReceive(m ndp.Message) (string, error) {
return "", fmt.Errorf("failed to send message: %v", err)
}

if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.interval)); err != nil {
if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.timeout)); err != nil {
ipc.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}
Expand Down
Loading