Skip to content

Commit

Permalink
coodirnator: set timeout for ip and gateway detection
Browse files Browse the repository at this point in the history
Signed-off-by: Cyclinder Kuo <[email protected]>
  • Loading branch information
cyclinder committed Jan 2, 2025
1 parent aea3599 commit ec4a945
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 88 deletions.
6 changes: 3 additions & 3 deletions cmd/coordinator/cmd/cni_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ func validateRPFilterConfig(rpfilter *int32, coordinatorConfig int64) (*int32, e
func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
if config == nil {
return &DetectOptions{
Interval: "1s",
TimeOut: "3s",
Interval: "10ms",
TimeOut: "100ms",
Retry: 3,
}, nil
}
Expand All @@ -289,7 +289,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
}

if config.TimeOut == "" {
config.TimeOut = "100ms"
config.TimeOut = "500ms"
}

_, err := time.ParseDuration(config.Interval)
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator-zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ EOF
| podRPFilter | 设置 Pod 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| hostRPFilter | (遗弃)设置节点 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| txQueueLen | 设置 Pod 的网卡传输队列 | 整数型 | optional | 0 |
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括重试次数(默认为 3 次), 探测间隔(默认为 10ms) 和 超时时间(默认为 100ms) | 对象类型 | optional ||
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括发送探测报文次数(默认为 3 次), 和响应的超时时间(默认为 100ms) | 对象类型 | optional ||
| logOptions | 日志配置,包括 logLevel(默认为 debug) 和 logFile(默认为 /var/log/spidernet/coordinator.log) | 对象类型 | optional | - |

> 如果您通过 `SpinderMultusConfig CR` 帮助创建 NetworkAttachmentDefinition CR,您可以在 `SpinderMultusConfig` 中配置 `coordinator` (所有字段)。参考: [SpinderMultusConfig](../reference/crd-spidermultusconfig.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Let's delve into how coordinator implements these features.
| podRPFilter | Set the rp_filter sysctl parameter on the pod, which is recommended to be set to 0 | int | optional | 0 |
| hostRPFilter | (deprecated)Set the rp_filter sysctl parameter on the node, which is recommended to be set to 0 | int | optional | 0 |
| txQueueLen | set txqueuelen(Transmit Queue Length) of the pod's interface | int | optional | 0 |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including retry numbers(default is 3), interval(default is 10ms) and timeout(default is 100ms) | obejct | optional | nil |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including the number of the send packets(default is 3) and the response timeout(default is 100ms) | obejct | optional | nil |
| logOptions | The configuration of logging, including logLevel(default is debug) and logFile(default is /var/log/spidernet/coordinator.log) | obejct | optional | nil |

> You can configure `coordinator` by specifying all the relevant fields in `SpinderMultusConfig` if a NetworkAttachmentDefinition CR is created via `SpinderMultusConfig CR`. For more information, please refer to [SpinderMultusConfig](../reference/crd-spidermultusconfig.md).
Expand Down
11 changes: 11 additions & 0 deletions pkg/errgroup/errgroup.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
package errgroup

import (
"context"
"fmt"
"runtime"
"sync"
Expand Down Expand Up @@ -43,6 +44,16 @@ func (g *Group) done() {
g.wg.Done()
}

// WithContext returns a new Group and an associated Context derived from ctx.
//
// The derived Context is canceled the first time a function passed to Go
// returns a non-nil error or the first time Wait returns, whichever occurs
// first.
func WithContext(ctx context.Context) (*Group, context.Context) {
ctx, cancel := context.WithCancelCause(ctx)
return &Group{cancel: cancel}, ctx
}

// Wait blocks until all function calls from the Go method have returned, then
// returns the first non-nil error (if any) from them.
func (g *Group) Wait() error {
Expand Down
31 changes: 9 additions & 22 deletions pkg/networking/gwconnection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,27 +74,18 @@ func (dg *DetectGateway) ArpingOverIface() error {
gwNetIP := netip.MustParseAddr(dg.V4Gw.String())
var gwHwAddr net.HardwareAddr
for i := 0; i < dg.retries; i++ {

err = client.SetReadDeadline(time.Now().Add(dg.timeout))
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to set ReadDeadline: %v", i+1, err)
time.Sleep(dg.interval)
continue
}

dg.logger.Sugar().Debugf("[RetryNum: %v]try to arping the gateway", i+1)
client.SetDeadline(time.Now().Add(dg.timeout))

Check failure on line 77 in pkg/networking/gwconnection/connection.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `client.SetDeadline` is not checked (errcheck)

Check failure on line 77 in pkg/networking/gwconnection/connection.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `client.SetDeadline` is not checked (errcheck)
dg.logger.Sugar().Debugf("[Retry: %v]try to arping the gateway", i+1)
gwHwAddr, err = client.Resolve(gwNetIP)
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to resolve: %v", i+1, err)
time.Sleep(dg.interval)
dg.logger.Sugar().Errorf("[Retry: %v]failed to resolve: %v", i+1, err)
continue
}

if gwHwAddr != nil {
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String())
return nil
}
time.Sleep(dg.interval)
}

dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err)
Expand Down Expand Up @@ -123,17 +114,13 @@ func (dg *DetectGateway) NDPingOverIface() error {
},
}

ticker := time.NewTicker(dg.interval)
defer ticker.Stop()

var gwHwAddr string
for i := 0; i < dg.retries && gwHwAddr == ""; i++ {
<-ticker.C
gwHwAddr, err = dg.sendReceive(client, msg)
if err != nil {
dg.logger.Sugar().Errorf("[retry number: %v]error detect if gateway is reachable: %v", i+1, err)
} else if gwHwAddr != "" {
dg.logger.Sugar().Infof("gateway %s is reachable, it's located at %s", dg.V6Gw.String(), gwHwAddr)
dg.logger.Sugar().Infof("gateway %s is reachable, it is located at %s", dg.V6Gw.String(), gwHwAddr)
return nil
}
}
Expand All @@ -152,18 +139,18 @@ func (dg *DetectGateway) sendReceive(client *ndp.Conn, m ndp.Message) (string, e
return "", fmt.Errorf("failed to determine solicited-node multicast address: %v", err)
}

if err := client.SetDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}

// we send a gratuitous neighbor solicitation to checking if ip is conflict
err = client.WriteTo(m, nil, snm)
if err != nil {
dg.logger.Error("[NDP]failed to send message", zap.Error(err))
return "", fmt.Errorf("failed to send message: %v", err)
}

if err := client.SetReadDeadline(time.Now().Add(dg.timeout)); err != nil {
dg.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}

msg, _, _, err := client.ReadFrom()
if err != nil {
return "", err
Expand Down
88 changes: 27 additions & 61 deletions pkg/networking/ipchecking/ipchecking.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,10 @@
package ipchecking

import (
"context"
"errors"
"fmt"
"net"
"net/netip"
"runtime"
"time"

types100 "github.com/containernetworking/cni/pkg/types/100"
Expand Down Expand Up @@ -61,7 +59,7 @@ func NewIPChecker(retries int, interval, timeout string, hostNs, netns ns.NetNS,
}

func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface string, errg *errgroup.Group) {
ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries))
ipc.logger.Debug("DoIPConflictChecking", zap.String("interval", ipc.interval.String()), zap.Int("retries", ipc.retries), zap.String("timeout", ipc.timeout.String()))
if len(ipconfigs) == 0 {
ipc.logger.Info("No ips found in pod, ignore pod ip's conflict checking")
return
Expand Down Expand Up @@ -99,78 +97,44 @@ func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface
}

func (ipc *IPChecker) ipCheckingByARP() error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

defer ipc.arpClient.Close()

var conflictingMac string
var err error
// start a goroutine to receive arp response
go func() {
runtime.LockOSThread()

// switch to pod's netns
if e := ipc.netns.Set(); e != nil {
ipc.logger.Warn("Detect IP Conflict: failed to switch to pod's net namespace")
}

defer func() {
err := ipc.hostNs.Set() // switch back
if err == nil {
// Unlock the current thread only when we successfully switched back
// to the original namespace; otherwise leave the thread locked which
// will force the runtime to scrap the current thread, that is maybe
// not as optimal but at least always safe to do.
runtime.UnlockOSThread()
}
}()

var packet *arp.Packet
for {
select {
case <-ctx.Done():
return
default:
packet, _, err = ipc.arpClient.Read()
if err != nil {
cancel()
return
}

if packet.Operation == arp.OperationReply {
// found reply and simple check if the reply packet is we want.
if packet.SenderIP.Compare(ipc.ip4) == 0 {
conflictingMac = packet.SenderHardwareAddr.String()
cancel()
return
}
}
}
}
}()

// we send a gratuitous arp to checking if ip is conflict
// we use dad mode(duplicate address detection mode), so
// we set source ip to 0.0.0.0
packet, err := arp.NewPacket(arp.OperationRequest, ipc.ifi.HardwareAddr, netip.MustParseAddr("0.0.0.0"), ethernet.Broadcast, ipc.ip4)
if err != nil {
cancel()
return err
}

ticker := time.NewTicker(ipc.interval)
defer ticker.Stop()

END:
loop:
for i := 0; i < ipc.retries; i++ {
select {
case <-ctx.Done():
break END
case <-ticker.C:
err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast)
ipc.logger.Sugar().Debugf("[Retry: %v]try to arping the ip", i+1)
ipc.arpClient.SetWriteDeadline(time.Now().Add(ipc.timeout))

Check failure on line 116 in pkg/networking/ipchecking/ipchecking.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `ipc.arpClient.SetWriteDeadline` is not checked (errcheck)

Check failure on line 116 in pkg/networking/ipchecking/ipchecking.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `ipc.arpClient.SetWriteDeadline` is not checked (errcheck)
err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast)
if err != nil {
ipc.logger.Error("[ARP]failed to send message", zap.Error(err))
}

ipc.arpClient.SetReadDeadline(time.Now().Add(ipc.timeout))

Check failure on line 122 in pkg/networking/ipchecking/ipchecking.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `ipc.arpClient.SetReadDeadline` is not checked (errcheck)

Check failure on line 122 in pkg/networking/ipchecking/ipchecking.go

View workflow job for this annotation

GitHub Actions / lint-golang

Error return value of `ipc.arpClient.SetReadDeadline` is not checked (errcheck)
for {
packet, _, err = ipc.arpClient.Read()
if err != nil {
break END
ipc.logger.Error("[ARP]failed to receive message", zap.Error(err))
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
break
}
}

if packet.Operation == arp.OperationReply {
// found reply and simple check if the reply packet is we want.
if packet.SenderIP.Compare(ipc.ip4) == 0 {
conflictingMac = packet.SenderHardwareAddr.String()
break loop
}
}
}
}
Expand Down Expand Up @@ -229,7 +193,9 @@ func (ipc *IPChecker) ipCheckingByNDP() error {
func (ipc *IPChecker) sendReceiveLoop(msg ndp.Message) (string, error) {
var hwAddr string
var err error

for i := 0; i < ipc.retries; i++ {
ipc.logger.Sugar().Debugf("[Retry: %v]try to ndping the ip", i+1)
hwAddr, err = ipc.sendReceive(msg)
switch err {
case errRetry:
Expand Down Expand Up @@ -269,7 +235,7 @@ func (ipc *IPChecker) sendReceive(m ndp.Message) (string, error) {
return "", fmt.Errorf("failed to send message: %v", err)
}

if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.interval)); err != nil {
if err := ipc.ndpClient.SetReadDeadline(time.Now().Add(ipc.timeout)); err != nil {
ipc.logger.Error("[NDP]failed to set deadline", zap.Error(err))
return "", fmt.Errorf("failed to set deadline: %v", err)
}
Expand Down

0 comments on commit ec4a945

Please sign in to comment.