Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update dispatch logic in nftables #9323

Merged
12 changes: 12 additions & 0 deletions felix/dataplane/linux/endpoint_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type endpointManager struct {
epMarkMapper rules.EndpointMarkMapper
newMatch func() generictables.MatchCriteria
actions generictables.ActionFactory
maps nftables.MapsDataplane

// Pending updates, cleared in CompleteDeferredWork as the data is copied to the activeXYZ
// fields.
Expand Down Expand Up @@ -226,6 +227,7 @@ func newEndpointManager(
wlInterfacePrefixes []string,
onWorkloadEndpointStatusUpdate EndpointStatusUpdateCallback,
defaultRPFilter string,
maps nftables.MapsDataplane,
bpfEnabled bool,
bpfEndpointManager hepListener,
callbacks *common.Callbacks,
Expand All @@ -246,6 +248,7 @@ func newEndpointManager(
writeProcSys,
os.Stat,
defaultRPFilter,
maps,
bpfEnabled,
bpfEndpointManager,
callbacks,
Expand All @@ -268,6 +271,7 @@ func newEndpointManagerWithShims(
procSysWriter procSysWriter,
osStat func(name string) (os.FileInfo, error),
defaultRPFilter string,
maps nftables.MapsDataplane,
bpfEnabled bool,
bpfEndpointManager hepListener,
callbacks *common.Callbacks,
Expand All @@ -289,6 +293,7 @@ func newEndpointManagerWithShims(
wlIfacesRegexp: wlIfacesRegexp,
kubeIPVSSupportEnabled: kubeIPVSSupportEnabled,
bpfEnabled: bpfEnabled,
maps: maps,
bpfEndpointManager: bpfEndpointManager,
floatingIPsEnabled: floatingIPsEnabled,

Expand Down Expand Up @@ -835,6 +840,13 @@ func (m *endpointManager) resolveWorkloadEndpoints() {
}

if !m.bpfEnabled && m.needToCheckDispatchChains {
if m.maps != nil {
// Update dispatch verdict maps if needed.
fromMappings, toMappings := m.ruleRenderer.DispatchMappings(m.activeWlEndpoints)
m.maps.AddOrReplaceMap(nftables.MapMetadata{Name: rules.NftablesFromWorkloadDispatchMap, Type: nftables.MapTypeInterfaceMatch}, fromMappings)
m.maps.AddOrReplaceMap(nftables.MapMetadata{Name: rules.NftablesToWorkloadDispatchMap, Type: nftables.MapTypeInterfaceMatch}, toMappings)
}

// Rewrite the dispatch chains if they've changed.
newDispatchChains := m.ruleRenderer.WorkloadDispatchChains(m.activeWlEndpoints)
m.updateDispatchChains(m.activeWlDispatchChains, newDispatchChains, m.filterTable)
Expand Down
1 change: 1 addition & 0 deletions felix/dataplane/linux/endpoint_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,7 @@ func endpointManagerTests(ipVersion uint8) func() {
mockProcSys.write,
mockProcSys.stat,
"1",
nil,
false,
hepListener,
common.NewCallbacks(),
Expand Down
20 changes: 16 additions & 4 deletions felix/dataplane/linux/int_dataplane.go
Original file line number Diff line number Diff line change
Expand Up @@ -509,7 +509,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
}
}

var nftablesV4RootTable generictables.Table
var nftablesV4RootTable *nftables.NftablesTable
var mangleTableV4, natTableV4, rawTableV4, filterTableV4 generictables.Table
var ipSetsV4 dpsets.IPSetsDataplane

Expand All @@ -530,7 +530,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
filterTableV4 = nftables.NewTableLayer("filter", nftablesV4RootTable)

// We use the root table for IP sets as well.
ipSetsV4 = nftablesV4RootTable.(dpsets.IPSetsDataplane)
ipSetsV4 = nftablesV4RootTable
} else {
// iptables mode
mangleTableV4 = iptables.NewTable(
Expand Down Expand Up @@ -726,7 +726,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
ipsetsManagerV6 := dpsets.NewIPSetsManager("ipv6", nil, config.MaxIPSetSize)

var mangleTableV6, natTableV6, rawTableV6, filterTableV6 generictables.Table
var nftablesV6RootTable generictables.Table
var nftablesV6RootTable *nftables.NftablesTable

if config.RulesConfig.NFTables {
nftablesV6RootTable = nftables.NewTable(
Expand Down Expand Up @@ -905,6 +905,11 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
}
}

var nftMaps nftables.MapsDataplane
if config.RulesConfig.NFTables {
nftMaps = nftablesV4RootTable
}

epManager := newEndpointManager(
rawTableV4,
mangleTableV4,
Expand All @@ -917,6 +922,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
config.RulesConfig.WorkloadIfacePrefixes,
dp.endpointStatusCombiner.OnEndpointStatusUpdate,
string(defaultRPFilter),
nftMaps,
config.BPFEnabled,
bpfEndpointManager,
callbacks,
Expand Down Expand Up @@ -969,7 +975,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
natTableV6 = nftables.NewTableLayer("nat", nftablesV6RootTable)
rawTableV6 = nftables.NewTableLayer("raw", nftablesV6RootTable)

ipSetsV6 = nftablesV6RootTable.(dpsets.IPSetsDataplane)
ipSetsV6 = nftablesV6RootTable
} else {
mangleTableV6 = iptables.NewTable(
"mangle",
Expand Down Expand Up @@ -1038,6 +1044,11 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
dp.RegisterManager(newRawEgressPolicyManager(rawTableV6, ruleRenderer, 6, ipSetsV6.SetFilter, config.RulesConfig.NFTables))
}

var nftMapsV6 nftables.MapsDataplane
if config.RulesConfig.NFTables {
nftMapsV6 = nftablesV6RootTable
}

dp.RegisterManager(newEndpointManager(
rawTableV6,
mangleTableV6,
Expand All @@ -1050,6 +1061,7 @@ func NewIntDataplaneDriver(config Config) *InternalDataplane {
config.RulesConfig.WorkloadIfacePrefixes,
dp.endpointStatusCombiner.OnEndpointStatusUpdate,
"",
nftMapsV6,
config.BPFEnabled,
nil,
callbacks,
Expand Down
4 changes: 4 additions & 0 deletions felix/generictables/match_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ type MatchCriteria interface {
NotICMPV6Type(t uint8) MatchCriteria
ICMPV6TypeAndCode(t, c uint8) MatchCriteria
NotICMPV6TypeAndCode(t, c uint8) MatchCriteria

// Only supported in nftables.
InInterfaceVMAP(mapname string) MatchCriteria
OutInterfaceVMAP(mapname string) MatchCriteria
}

type AddrType string
Expand Down
10 changes: 10 additions & 0 deletions felix/iptables/match_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,16 @@ func (m matchCriteria) NotICMPV6TypeAndCode(t, c uint8) generictables.MatchCrite
return append(m, fmt.Sprintf("-m icmp6 ! --icmpv6-type %d/%d", t, c))
}

func (m matchCriteria) InInterfaceVMAP(mapname string) generictables.MatchCriteria {
log.Panic("InInterfaceVMAP not supported in iptables")
return m
}

func (m matchCriteria) OutInterfaceVMAP(mapname string) generictables.MatchCriteria {
log.Panic("OutInterfaceVMAP not supported in iptables")
return m
}

func PortsToMultiport(ports []uint16) string {
portFragments := make([]string, len(ports))
for i, port := range ports {
Expand Down
47 changes: 17 additions & 30 deletions felix/nftables/ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/prometheus/client_golang/prometheus"
log "github.com/sirupsen/logrus"
"golang.org/x/sync/errgroup"
"sigs.k8s.io/knftables"

dpsets "github.com/projectcalico/calico/felix/dataplane/ipsets"
Expand Down Expand Up @@ -334,12 +335,12 @@ func (s *IPSets) ApplyUpdates() {

for attempt := 0; attempt < 10; attempt++ {
if attempt > 0 {
s.logCxt.Info("Retrying after an ipsets update failure...")
s.logCxt.Info("Retrying after an nftables set update failure...")
}
if s.resyncRequired {
// Compare our in-memory state against the dataplane and queue up
// modifications to fix any inconsistencies.
s.logCxt.Debug("Resyncing ipsets with dataplane.")
s.logCxt.Debug("Resyncing nftables sets with dataplane.")
s.opReporter.RecordOperation(fmt.Sprint("resync-nft-sets-v", s.IPVersionConfig.Family.Version()))

if err := s.tryResync(); err != nil {
Expand Down Expand Up @@ -404,47 +405,33 @@ func (s *IPSets) tryResync() error {
type setData struct {
setName string
elems []*knftables.Element
err error
}
setsChan := make(chan setData)
defer close(setsChan)

// Start a goroutine to list the elements of each set. Limit concurrent set reads to
// avoid spawning too many goroutines if there are a large number of sets.
routineLimit := make(chan struct{}, 100)
defer close(routineLimit)
for _, setName := range sets {
// Wait for room in the limiting channel.
routineLimit <- struct{}{}
// Create an errgroup to wait for all the set reads to complete.
g, egCtx := errgroup.WithContext(ctx)
g.SetLimit(100)
responses := make([]setData, len(sets))

for i, name := range sets {
// Start a goroutine to read this set.
go func(name string) {
// Make sure to indicate that we're done by removing ourselves from the limiter channel.
defer func() { <-routineLimit }()

elems, err := s.nft.ListElements(ctx, "set", name)
g.Go(func() error {
elems, err := s.nft.ListElements(egCtx, "set", name)
if err != nil {
setsChan <- setData{setName: name, err: err}
return
return err
}
setsChan <- setData{setName: name, elems: elems}
}(setName)
responses[i] = setData{setName: name, elems: elems}
return nil
})
}

// We expect a response for every set we asked for.
responses := make([]setData, len(sets))
for i := range responses {
setData := <-setsChan
responses[i] = setData
// Wait for all the set reads to complete.
if err := g.Wait(); err != nil {
return fmt.Errorf("failed to list set elements: %w", err)
}

for _, setData := range responses {
setName := setData.setName
logCxt := s.logCxt.WithField("setName", setName)
if setData.err != nil {
logCxt.WithError(err).Error("Failed to list set elements.")
return setData.err
}

// TODO: We need to be able to extract the set type from the dataplane, otherwise we cannot
// tell whether or not an IP set has the correct type.
Expand Down
18 changes: 18 additions & 0 deletions felix/nftables/ipsets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,24 @@ var _ = Describe("IPSets with empty data plane", func() {
))
})

It("should resync with a large number of sets", func() {
// Create a large number of sets - larger than the number of gorooutines we limit
// ourselves to in the resync code.
tx := f.NewTransaction()
tx.Add(&knftables.Table{})
for i := 0; i < 200; i++ {
tx.Add(&knftables.Set{
Name: fmt.Sprintf("set-%d", i),
Type: "ipv4_addr",
})
}
Expect(f.Run(context.Background(), tx)).NotTo(HaveOccurred())

// Trigger a resync.
s.QueueResync()
Expect(s.ApplyUpdates).NotTo(Panic())
})

It("should handle unexpected sets with types that are not supported", func() {
// Create an IP set direclty in the dataplane, with a type that is not supported by the IPSets object.
tx := f.NewTransaction()
Expand Down
Loading
Loading