Skip to content

Commit

Permalink
coodirnator: set timeout for ip and gateway detection
Browse files Browse the repository at this point in the history
Signed-off-by: Cyclinder Kuo <[email protected]>
  • Loading branch information
cyclinder committed Jan 3, 2025
1 parent 5014593 commit 21d133c
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 111 deletions.
6 changes: 3 additions & 3 deletions cmd/coordinator/cmd/cni_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ func validateRPFilterConfig(rpfilter *int32, coordinatorConfig int64) (*int32, e
func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
if config == nil {
return &DetectOptions{
Interval: "1s",
TimeOut: "3s",
Interval: "10ms",
TimeOut: "100ms",
Retry: 3,
}, nil
}
Expand All @@ -289,7 +289,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
}

if config.TimeOut == "" {
config.TimeOut = "100ms"
config.TimeOut = "500ms"
}

_, err := time.ParseDuration(config.Interval)
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator-zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ EOF
| podRPFilter | 设置 Pod 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| hostRPFilter | (遗弃)设置节点 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| txQueueLen | 设置 Pod 的网卡传输队列 | 整数型 | optional | 0 |
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括重试次数(默认为 3 次), 探测间隔(默认为 10ms) 和 超时时间(默认为 100ms) | 对象类型 | optional ||
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括发送探测报文次数(retries: 默认为 3 次), 和响应的超时时间(timeout: 默认为 100ms),还有发送报文的间隔(interval:默认为 10ms, 将会在未来版本中移除) | 对象类型 | optional ||
| logOptions | 日志配置,包括 logLevel(默认为 debug) 和 logFile(默认为 /var/log/spidernet/coordinator.log) | 对象类型 | optional | - |

> 如果您通过 `SpinderMultusConfig CR` 帮助创建 NetworkAttachmentDefinition CR,您可以在 `SpinderMultusConfig` 中配置 `coordinator` (所有字段)。参考: [SpinderMultusConfig](../reference/crd-spidermultusconfig.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Let's delve into how coordinator implements these features.
| podRPFilter | Set the rp_filter sysctl parameter on the pod, which is recommended to be set to 0 | int | optional | 0 |
| hostRPFilter | (deprecated)Set the rp_filter sysctl parameter on the node, which is recommended to be set to 0 | int | optional | 0 |
| txQueueLen | set txqueuelen(Transmit Queue Length) of the pod's interface | int | optional | 0 |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including retry numbers(default is 3), interval(default is 10ms) and timeout(default is 100ms) | obejct | optional | nil |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including the number of the send packets(retries: default is 3) and the response timeout(timeout: default is 100ms) and the packet sending interval(interval: default is 10ms, which will be removed in the future version). | obejct | optional | nil |
| logOptions | The configuration of logging, including logLevel(default is debug) and logFile(default is /var/log/spidernet/coordinator.log) | obejct | optional | nil |

> You can configure `coordinator` by specifying all the relevant fields in `SpinderMultusConfig` if a NetworkAttachmentDefinition CR is created via `SpinderMultusConfig CR`. For more information, please refer to [SpinderMultusConfig](../reference/crd-spidermultusconfig.md).
Expand Down
11 changes: 11 additions & 0 deletions pkg/errgroup/errgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package errgroup

import (
"context"
"fmt"
"runtime"
"sync"
Expand Down Expand Up @@ -43,6 +44,16 @@ func (g *Group) done() {
g.wg.Done()
}

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
Expand Down
55 changes: 25 additions & 30 deletions pkg/networking/gwconnection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,37 +72,36 @@ func (dg *DetectGateway) ArpingOverIface() error {
defer client.Close()

gwNetIP := netip.MustParseAddr(dg.V4Gw.String())
var gwHwAddr net.HardwareAddr
for i := 0; i < dg.retries; i++ {
if err = client.SetDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Sugar().Errorf("failed to set deadline: %v", err)
return err
}

err = client.SetReadDeadline(time.Now().Add(dg.timeout))
for i := 0; i < dg.retries; i++ {
dg.logger.Sugar().Debugf("[Retry: %v]try to send the arp request", i+1)
err := client.Request(gwNetIP)
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to set ReadDeadline: %v", i+1, err)
time.Sleep(dg.interval)
dg.logger.Sugar().Errorf("[Retry: %v]failed to send the arp request: %v", i+1, err)
continue
}

dg.logger.Sugar().Debugf("[RetryNum: %v]try to arping the gateway", i+1)
gwHwAddr, err = client.Resolve(gwNetIP)
}

// Loop and wait for replies
for {
res, _, err := client.Read()
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to resolve: %v", i+1, err)
time.Sleep(dg.interval)
continue
dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err)
return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable)
}

if gwHwAddr != nil {
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String())
return nil
if res.Operation != arp.OperationReply || res.SenderIP != gwNetIP {
continue
}
time.Sleep(dg.interval)
}

if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), err)
return fmt.Errorf("gateway %s is %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable)
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, res.SenderHardwareAddr.String())
return nil
}

return fmt.Errorf("failed to checking the gateway %s if is reachable: %w", dg.V4Gw.String(), err)
}

func (dg *DetectGateway) NDPingOverIface() error {
Expand All @@ -127,17 +126,13 @@ func (dg *DetectGateway) NDPingOverIface() error {
},
}

ticker := time.NewTicker(dg.interval)
defer ticker.Stop()

var gwHwAddr string
for i := 0; i < dg.retries && gwHwAddr == ""; i++ {
<-ticker.C
gwHwAddr, err = dg.sendReceive(client, msg)
if err != nil {
dg.logger.Sugar().Errorf("[retry number: %v]error detect if gateway is reachable: %v", i+1, err)
} else if gwHwAddr != "" {
dg.logger.Sugar().Infof("gateway %s is reachable, it's located at %s", dg.V6Gw.String(), gwHwAddr)
dg.logger.Sugar().Infof("gateway %s is reachable, it is located at %s", dg.V6Gw.String(), gwHwAddr)
return nil
}
}
Expand All @@ -159,18 +154,18 @@ func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, e
return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err)
}

if err := client.SetDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}

// we send a gratuitous neighbor solicitation to checking if ip is conflict
err = client.WriteTo(m, nil, snm)
if err != nil {
dg.logger.Error("[NDP]failed to send message", zap.Error(err))
return "", fmt.Errorf("failed to send message: %v", err)
}

if err := client.SetReadDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}

msg, _, _, err := client.ReadFrom()
if err != nil {
return "", err
Expand Down
111 changes: 35 additions & 76 deletions pkg/networking/ipchecking/ipchecking.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
package ipchecking

import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"runtime"
"time"

types100 "github.com/containernetworking/cni/pkg/types/100"
Expand Down Expand Up @@ -61,7 +59,7 @@ func NewIPChecker(retries int, interval, timeout string, hostNs, netns ns.NetNS,
}

func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface string, errg *errgroup.Group) {
ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries))
ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries), zap.String("timeout", ipc.timeout.String()))
if len(ipconfigs) == 0 {
ipc.logger.Info("No ips found in pod, ignore pod ip's conflict checking")
return
Expand Down Expand Up @@ -99,91 +97,50 @@ func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface
}

func (ipc *IPChecker) ipCheckingByARP() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer ipc.arpClient.Close()

var conflictingMac string
var err error
// start a goroutine to receive arp response
go func() {
runtime.LockOSThread()

// switch to pod's netns
if e := ipc.netns.Set(); e != nil {
ipc.logger.Warn("Detect IP Conflict: failed to switch to pod's net namespace")
for i := 0; i < ipc.retries; i++ {
ipc.logger.Sugar().Debugf("[Retry: %v]try to arping the ip", i+1)
if err = ipc.arpClient.SetDeadline(time.Now().Add(ipc.timeout)); err != nil {
ipc.logger.Error("[ARP]failed to set deadline", zap.Error(err))
continue
}

defer func() {
err := ipc.hostNs.Set() // switch back
if err == nil {
// Unlock the current thread only when we successfully switched back
// to the original namespace; otherwise leave the thread locked which
// will force the runtime to scrap the current thread, that is maybe
// not as optimal but at least always safe to do.
runtime.UnlockOSThread()
}
}()

var packet *arp.Packet
for {
select {
case <-ctx.Done():
return
default:
packet, _, err = ipc.arpClient.Read()
if err != nil {
cancel()
return
}

if packet.Operation == arp.OperationReply {
// found reply and simple check if the reply packet is we want.
if packet.SenderIP.Compare(ipc.ip4) == 0 {
conflictingMac = packet.SenderHardwareAddr.String()
cancel()
return
}
}
}
// we send a gratuitous arp to checking if ip is conflict
// we use dad mode(duplicate address detection mode), so
// we set source ip to 0.0.0.0
packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4)
if err != nil {
return err
}
}()

// we send a gratuitous arp to checking if ip is conflict
// we use dad mode(duplicate address detection mode), so
// we set source ip to 0.0.0.0
packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4)
if err != nil {
cancel()
return err
}

ticker := time.NewTicker(ipc.interval)
defer ticker.Stop()
err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast)
if err != nil {
ipc.logger.Error("[ARP]failed to send message", zap.Error(err))
continue
}

END:
for i := 0; i < ipc.retries; i++ {
select {
case <-ctx.Done():
break END
case <-ticker.C:
err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast)
if err != nil {
break END
}
packet, _, err = ipc.arpClient.Read()
if err != nil {
ipc.logger.Error("[ARP]failed to receive message", zap.Error(err))
continue
}
}

if err != nil {
return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err)
}
if packet.Operation != arp.OperationReply || packet.SenderIP.Compare(ipc.ip4) != 0 {
continue
}

if conflictingMac != "" {
// found ip conflicting
ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", conflictingMac))
ipc.logger.Error("Found IPv4 address conflicting", zap.String("Conflicting IP", ipc.ip4.String()), zap.String("Host", packet.SenderHardwareAddr.String()))
return fmt.Errorf("%w: pod's interface %s with an conflicting ip %s, %s is located at %s",
constant.ErrIPConflict, ipc.ifi.Name, ipc.ip4.String(), ipc.ip4.String(), conflictingMac)
constant.ErrIPConflict, ipc.ifi.Name, ipc.ip4.String(), ipc.ip4.String(), packet.SenderHardwareAddr.String())
}

if err != nil {
if neterr, ok := err.(net.Error); ok && !neterr.Timeout() {
return fmt.Errorf("failed to checking ip %s if it's conflicting: %v", ipc.ip4.String(), err)
}
}

ipc.logger.Debug("No ipv4 address conflict", zap.String("IPv4 address", ipc.ip4.String()))
Expand Down Expand Up @@ -229,7 +186,9 @@ func (ipc *IPChecker) ipCheckingByNDP() error {
func (ipc *IPChecker) sendReceiveLoop(msg ndp.Message) (string, error) {
var hwAddr string
var err error

for i := 0; i < ipc.retries; i++ {
ipc.logger.Sugar().Debugf("[Retry: %v]try to ndping the ip", i+1)
hwAddr, err = ipc.sendReceive(msg)
switch err {
case errRetry:
Expand Down Expand Up @@ -269,7 +228,7 @@ func (ipc *IPChecker) sendReceive(m ndp.Message) (string, error) {
return "", fmt.Errorf("failed to send message: %v", err)
}

if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.interval)); err != nil {
if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.timeout)); err != nil {
ipc.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}
Expand Down

0 comments on commit 21d133c

Please sign in to comment.