Skip to content

Commit

Permalink
coodirnator: set timeout for ip and gateway detection
Browse files Browse the repository at this point in the history
Signed-off-by: Cyclinder Kuo <[email protected]>
  • Loading branch information
cyclinder committed Jan 2, 2025
1 parent aea3599 commit 0da4740
Show file tree
Hide file tree
Showing 5 changed files with 56 additions and 36 deletions.
2 changes: 1 addition & 1 deletion cmd/coordinator/cmd/cni_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func ValidateDelectOptions(config *DetectOptions) (*DetectOptions, error) {
}

if config.TimeOut == "" {
config.TimeOut = "100ms"
config.TimeOut = "200ms"
}

_, err := time.ParseDuration(config.Interval)
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator-zh_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ EOF
| podRPFilter | 设置 Pod 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| hostRPFilter | (遗弃)设置节点 的 sysctl 参数 rp_filter | 整数型 | optional | 0 |
| txQueueLen | 设置 Pod 的网卡传输队列 | 整数型 | optional | 0 |
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括重试次数(默认为 3 次), 探测间隔(默认为 10ms) 和 超时时间(默认为 100ms) | 对象类型 | optional ||
| detectOptions | 检测地址冲突和网关可达性的高级配置项: 包括重试次数(默认为 3 次), 探测间隔(默认为 10ms) 和 超时时间(默认为 200ms) | 对象类型 | optional ||
| logOptions | 日志配置,包括 logLevel(默认为 debug) 和 logFile(默认为 /var/log/spidernet/coordinator.log) | 对象类型 | optional | - |

> 如果您通过 `SpinderMultusConfig CR` 帮助创建 NetworkAttachmentDefinition CR,您可以在 `SpinderMultusConfig` 中配置 `coordinator` (所有字段)。参考: [SpinderMultusConfig](../reference/crd-spidermultusconfig.md)
Expand Down
2 changes: 1 addition & 1 deletion docs/concepts/coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ Let's delve into how coordinator implements these features.
| podRPFilter | Set the rp_filter sysctl parameter on the pod, which is recommended to be set to 0 | int | optional | 0 |
| hostRPFilter | (deprecated)Set the rp_filter sysctl parameter on the node, which is recommended to be set to 0 | int | optional | 0 |
| txQueueLen | set txqueuelen(Transmit Queue Length) of the pod's interface | int | optional | 0 |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including retry numbers(default is 3), interval(default is 10ms) and timeout(default is 100ms) | obejct | optional | nil |
| detectOptions | The advanced configuration of detectGateway and detectIPConflict, including retry numbers(default is 3), interval(default is 10ms) and timeout(default is 200ms) | obejct | optional | nil |
| logOptions | The configuration of logging, including logLevel(default is debug) and logFile(default is /var/log/spidernet/coordinator.log) | obejct | optional | nil |

> You can configure `coordinator` by specifying all the relevant fields in `SpinderMultusConfig` if a NetworkAttachmentDefinition CR is created via `SpinderMultusConfig CR`. For more information, please refer to [SpinderMultusConfig](../reference/crd-spidermultusconfig.md).
Expand Down
48 changes: 28 additions & 20 deletions pkg/networking/gwconnection/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package gwconnection

import (
"context"
"fmt"
"net"
"net/netip"
Expand Down Expand Up @@ -71,30 +72,37 @@ func (dg *DetectGateway) ArpingOverIface() error {
}
defer client.Close()

ticker := time.NewTicker(dg.interval)
defer ticker.Stop()

ctx, cancel := context.WithTimeout(context.Background(), dg.timeout)
defer cancel()

gwNetIP := netip.MustParseAddr(dg.V4Gw.String())
var gwHwAddr net.HardwareAddr
for i := 0; i < dg.retries; i++ {

err = client.SetReadDeadline(time.Now().Add(dg.timeout))
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to set ReadDeadline: %v", i+1, err)
time.Sleep(dg.interval)
continue
}

dg.logger.Sugar().Debugf("[RetryNum: %v]try to arping the gateway", i+1)
gwHwAddr, err = client.Resolve(gwNetIP)
if err != nil {
dg.logger.Sugar().Errorf("[RetryNum: %v]failed to resolve: %v", i+1, err)
time.Sleep(dg.interval)
continue
}

if gwHwAddr != nil {
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String())
return nil
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
err = client.SetReadDeadline(time.Now().Add(dg.interval))
if err != nil {
dg.logger.Sugar().Errorf("[Retry: %v]failed to set ReadDeadline: %v", i+1, err)
continue
}

dg.logger.Sugar().Debugf("[Retry: %v]try to arping the gateway", i+1)
gwHwAddr, err = client.Resolve(gwNetIP)
if err != nil {
dg.logger.Sugar().Errorf("[Retry: %v]failed to resolve: %v", i+1, err)
continue
}

if gwHwAddr != nil {
dg.logger.Sugar().Infof("Gateway %s is reachable, gateway is located at %v", gwNetIP, gwHwAddr.String())
return nil
}
}
time.Sleep(dg.interval)
}

dg.logger.Sugar().Errorf("gateway %s is %v, reason: %v", dg.V4Gw.String(), constant.ErrGatewayUnreachable, err)
Expand Down
38 changes: 25 additions & 13 deletions pkg/networking/ipchecking/ipchecking.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,8 @@ func (ipc *IPChecker) DoIPConflictChecking(ipconfigs []*types100.IPConfig, iface
}

func (ipc *IPChecker) ipCheckingByARP() error {
ctx, cancel := context.WithCancel(context.Background())
ctx, cancel := context.WithTimeout(context.Background(), ipc.timeout)
defer cancel()

defer ipc.arpClient.Close()

var conflictingMac string
Expand Down Expand Up @@ -166,6 +165,7 @@ END:
for i := 0; i < ipc.retries; i++ {
select {
case <-ctx.Done():
err = ctx.Err()
break END
case <-ticker.C:
err = ipc.arpClient.WriteTo(packet, ethernet.Broadcast)
Expand Down Expand Up @@ -229,20 +229,32 @@ func (ipc *IPChecker) ipCheckingByNDP() error {
func (ipc *IPChecker) sendReceiveLoop(msg ndp.Message) (string, error) {
var hwAddr string
var err error

ctx, cancel := context.WithTimeout(context.Background(), ipc.timeout)
defer cancel()

ticker := time.NewTicker(ipc.interval)
defer ticker.Stop()

for i := 0; i < ipc.retries; i++ {
hwAddr, err = ipc.sendReceive(msg)
switch err {
case errRetry:
continue
case nil:
return hwAddr, NDPFoundReply
default:
// Was the error caused by a read timeout, and should the loop continue?
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
ipc.logger.Error(err.Error())
select {
case <-ctx.Done():
return "", ctx.Err()
case <-ticker.C:
hwAddr, err = ipc.sendReceive(msg)
switch err {
case errRetry:
continue
case nil:
return hwAddr, NDPFoundReply
default:
// Was the error caused by a read timeout, and should the loop continue?
if neterr, ok := err.(net.Error); ok && neterr.Timeout() {
ipc.logger.Error(err.Error())
continue
}
return "", err
}
return "", err
}
}

Expand Down

0 comments on commit 0da4740

Please sign in to comment.