Skip to content

Commit

Permalink
other various cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
caseydavenport committed Oct 14, 2024
1 parent b6dbb0f commit 4ea9bce
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 50 deletions.
4 changes: 2 additions & 2 deletions felix/nftables/ipsets.go
Original file line number Diff line number Diff line change
Expand Up @@ -335,12 +335,12 @@ func (s *IPSets) ApplyUpdates() {

for attempt := 0; attempt < 10; attempt++ {
if attempt > 0 {
s.logCxt.Info("Retrying after an ipsets update failure...")
s.logCxt.Info("Retrying after an nftables set update failure...")
}
if s.resyncRequired {
// Compare our in-memory state against the dataplane and queue up
// modifications to fix any inconsistencies.
s.logCxt.Debug("Resyncing ipsets with dataplane.")
s.logCxt.Debug("Resyncing nftables sets with dataplane.")
s.opReporter.RecordOperation(fmt.Sprint("resync-nft-sets-v", s.IPVersionConfig.Family.Version()))

if err := s.tryResync(); err != nil {
Expand Down
107 changes: 59 additions & 48 deletions felix/nftables/maps.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,21 @@ import (
"sigs.k8s.io/knftables"
)

var (
gaugeVecNumMaps = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "felix_nft_maps",
Help: "Number of active Calico nftables maps.",
}, []string{"ip_version"})
countNumMapTransactions = prometheus.NewCounter(prometheus.CounterOpts{
Name: "felix_nft_map_calls",
Help: "Number of nftables map transactions executed.",
})
countNumMapErrors = prometheus.NewCounter(prometheus.CounterOpts{
Name: "felix_nft_map_errors",
Help: "Number of nftables map transaction failures.",
})
)

type MapType string

const MapTypeInterfaceMatch MapType = "interfaceMatch"
Expand Down Expand Up @@ -63,13 +78,13 @@ type Maps struct {
// subset of mapNameToAllMetadata.
// Its Dataplane() map contains all maps matching the IPVersionConfig
// that we think are in the dataplane. This includes any temporary IP
// sets and maps that we discovered on a resync (neither of which will
// maps and maps that we discovered on a resync (neither of which will
// have entries in the Desired() map).
mapNameToProgrammedMetadata *deltatracker.DeltaTracker[string, MapMetadata]

// mainSetNameToMembers contains entries for all maps that are in
// mapNameToAllMetadata along with entries for "main" (non-temporary) IP
// sets that we think are still in the dataplane. For maps that are in mapNameToAllMetadata, the
// mapNameToAllMetadata along with entries for "main" (non-temporary)
// maps that we think are still in the dataplane. For maps that are in mapNameToAllMetadata, the
// Desired() side of the tracker contains the members that we've been told
// about. Otherwise, Desired() is empty. The Dataplane() side of the
// tracker contains the members that are thought to be in the dataplane.
Expand All @@ -85,8 +100,11 @@ type Maps struct {
nft knftables.Interface

// function to determine if a chain exists in the dataplane. Needed to skip programming of entries
// until the requisite chains are created.
// until the requisite chains are programmed by the Table.
chainExists chainExistsFunc

// Callbacks to increment and decrement reference counts for chains so that chains
// referenced in maps are programmed by the Table implementation as needed.
increfChain func(chain string)
decrefChain func(chain string)
}
Expand Down Expand Up @@ -135,7 +153,7 @@ func NewMapsWithShims(
mapsWithDirtyMembers: set.New[string](),
resyncRequired: true,
logCxt: familyLogger,
gaugeNumMaps: gaugeVecNumSets.WithLabelValues(familyStr),
gaugeNumMaps: gaugeVecNumMaps.WithLabelValues(familyStr),
sleep: sleep,
nft: nft,
chainExists: chainExists,
Expand Down Expand Up @@ -183,13 +201,15 @@ func (s *Maps) AddOrReplaceMap(meta MapMetadata, members map[string][]string) {
s.updateDirtiness(name)
}

// maybeDecrefChain takes a MapMember and decrefs any referenced chain (if it has one).
func (s *Maps) maybeDecrefChain(member MapMember) {
switch t := member.(type) {
case interfaceToChain:
s.decrefChain(t.chain)
}
}

// maybeIncrefChain takes a MapMember and increfs any referenced chain (if it has one).
func (s *Maps) maybeIncrefChain(member MapMember) {
switch t := member.(type) {
case interfaceToChain:
Expand Down Expand Up @@ -233,8 +253,6 @@ func (s *Maps) RemoveMap(setID string) {
// nameForMainMap takes the given set ID and returns the name of the map as seen in nftables. This
// helper should be used to sanitize any set IDs, ensuring they are a consistent format.
func (s *Maps) nameForMainMap(setID string) string {
// TODO: IPVersion needs to be taken into account for sets that include IP data.
// return LegalizeSetName(s.IPVersionConfig.NameForMainMap(setID))
return LegalizeSetName(setID)
}

Expand Down Expand Up @@ -301,13 +319,7 @@ func (s *Maps) GetTypeOf(setID string) (MapType, error) {

func (s *Maps) filterAndCanonicaliseMembers(mtype MapType, members map[string][]string) set.Set[MapMember] {
filtered := set.New[MapMember]()
// wantIPV6 := s.IPVersionConfig.Family == ipsets.IPFamilyV6
for k, v := range members {
// TODO: Right now we only use interfaces in vmaps, no need for family filtering.
// isIPV6 := ipSetType.IsMemberIPV6(member)
// if wantIPV6 != isIPV6 {
// continue
// }
filtered.Add(CanonicaliseMapMember(mtype, k, v))
}
return filtered
Expand Down Expand Up @@ -344,13 +356,13 @@ func (s *Maps) ApplyMapUpdates() {

for attempt := 0; attempt < 10; attempt++ {
if attempt > 0 {
s.logCxt.Info("Retrying after an ipsets update failure...")
s.logCxt.Info("Retrying after an nftables map update failure...")
}
if s.resyncRequired {
// Compare our in-memory state against the dataplane and queue up
// modifications to fix any inconsistencies.
s.logCxt.Debug("Resyncing ipsets with dataplane.")
s.opReporter.RecordOperation(fmt.Sprint("resync-nft-sets-v", s.IPVersionConfig.Family.Version()))
s.logCxt.Debug("Resyncing maps with dataplane.")
s.opReporter.RecordOperation(fmt.Sprint("resync-nft-maps-v", s.IPVersionConfig.Family.Version()))

if err := s.tryResync(); err != nil {
s.logCxt.WithError(err).Warning("Failed to resync with dataplane")
Expand Down Expand Up @@ -383,17 +395,17 @@ func (s *Maps) tryResync() error {
resyncStart := time.Now()
defer func() {
s.logCxt.WithFields(log.Fields{
"resyncDuration": time.Since(resyncStart),
"ipSetsWithDirtyMembers": s.mapsWithDirtyMembers.Len(),
"ipSetsToCreateOrRecreate": s.mapNameToProgrammedMetadata.PendingUpdates().Len(),
"ipSetsToDelete": s.mapNameToProgrammedMetadata.PendingDeletions().Len(),
"resyncDuration": time.Since(resyncStart),
"mapsWithDirtyMembers": s.mapsWithDirtyMembers.Len(),
"mapsToCreateOrRecreate": s.mapNameToProgrammedMetadata.PendingUpdates().Len(),
"mapsToDelete": s.mapNameToProgrammedMetadata.PendingDeletions().Len(),
}).Debug("Finished Maps resync")
}()

// Clear the dataplane metadata view, we'll build it back up again as we scan.
s.mapNameToProgrammedMetadata.Dataplane().DeleteAll()

// Load sets from the dataplane. Update our Dataplane() maps with the actual contents
// Load from the dataplane. Update our Dataplane() maps with the actual contents
// of the data plane so that the next ApplyMapUpdates() call will be able to properly make
// incremental updates.
//
Expand All @@ -406,21 +418,21 @@ func (s *Maps) tryResync() error {
// Table doesn't exist - nothing to resync.
return nil
}
return fmt.Errorf("error listing nftables sets: %s", err)
return fmt.Errorf("error listing nftables maps: %s", err)
}

// We'll process each set in parallel, so we need a struct to hold the results.
// Once knftables is augmented to support reading many sets at once, we can remove this.
// Once knftables is augmented to support reading many maps at once, we can remove this.
type mapData struct {
name string
elems []*knftables.Element
err error
}
setsChan := make(chan mapData)
defer close(setsChan)
mapsCh := make(chan mapData)
defer close(mapsCh)

// Start a goroutine to list the elements of each set. Limit concurrent set reads to
// avoid spawning too many goroutines if there are a large number of sets.
// avoid spawning too many goroutines if there are a large number of maps.
routineLimit := make(chan struct{}, 100)
defer close(routineLimit)
for _, name := range maps {
Expand All @@ -434,17 +446,17 @@ func (s *Maps) tryResync() error {

elems, err := s.nft.ListElements(ctx, "map", name)
if err != nil {
setsChan <- mapData{name: name, err: err}
mapsCh <- mapData{name: name, err: err}
return
}
setsChan <- mapData{name: name, elems: elems}
mapsCh <- mapData{name: name, elems: elems}
}(name)
}

// We expect a response for every set we asked for.
responses := make([]mapData, len(maps))
for i := range responses {
setData := <-setsChan
setData := <-mapsCh
responses[i] = setData
}

Expand Down Expand Up @@ -481,9 +493,7 @@ func (s *Maps) tryResync() error {
case MapTypeInterfaceMatch:
strElems[e.Key[0]] = e.Value
default:
continue
// TODO:
// unknownElems.Add(UnknownMember(e.Key))
unknownElems.Add(UnknownMapMember(e.Key, e.Value))
}
}
elemsSet := s.filterAndCanonicaliseMembers(metadata.Type, strElems)
Expand All @@ -506,6 +516,7 @@ func (s *Maps) tryResync() error {
// TODO: Ideally we'd extract this information from the data plane itself, but it's not exposed
// via knftables at the moment.
s.mapNameToProgrammedMetadata.Dataplane().Set(mapName, MapMetadata{
ID: metadata.ID,
Type: metadata.Type,
})

Expand All @@ -520,7 +531,7 @@ func (s *Maps) tryResync() error {
s.updateDirtiness(mapName)
}

// Mark any sets that we didn't see as empty.
// Mark any maps that we didn't see as empty.
for name, members := range s.mainSetNameToMembers {
if _, ok := s.mapNameToProgrammedMetadata.Dataplane().Get(name); ok {
// In the dataplane, we should have updated its members above.
Expand Down Expand Up @@ -659,7 +670,7 @@ func (s *Maps) tryUpdates() error {
defer cancel()
if err := s.runTransaction(ctx, tx); err != nil {
s.logCxt.WithError(err).Errorf("Failed to update maps. %s", tx.String())
return fmt.Errorf("error updating nftables sets: %s", err)
return fmt.Errorf("error updating nftables maps: %s", err)
}

// If we get here, the writes were successful, reset the maps delta tracking now the
Expand Down Expand Up @@ -692,17 +703,17 @@ func (s *Maps) tryUpdates() error {
// ApplyDeletions tries to delete any maps that are no longer needed.
// Failures are ignored, deletions will be retried the next time we do a resync.
func (s *Maps) ApplyDeletions() bool {
// We rate limit the number of sets we delete in one go to avoid blocking the main loop for too long.
// nftables supports deleting multiple sets in a single transactions, which means we delete more at once
// We rate limit the number of maps we delete in one go to avoid blocking the main loop for too long.
// nftables supports deleting multiple maps in a single transactions, which means we delete more at once
// than the iptables dataplane which deletes one at a time.
maxDeletions := 500

tx := s.nft.NewTransaction()
deletedSets := set.New[string]()
deletedMaps := set.New[string]()
s.mapNameToProgrammedMetadata.PendingDeletions().Iter(func(mapName string) deltatracker.IterAction {
if deletedSets.Len() >= maxDeletions {
if deletedMaps.Len() >= maxDeletions {
// Deleting maps is slow (40ms) and serialised in the kernel. Avoid holding up the main loop
// for too long. We'll leave the remaining sets pending deletion and mop them up next time.
// for too long. We'll leave the remaining maps pending deletion and mop them up next time.
log.Debugf("Deleted batch of %d maps, rate limiting further map deletions.", maxDeletions)
// Leave the item in the set, so we'll do another batch of deletions next time around the loop.
return deltatracker.IterActionNoOpStopIteration
Expand All @@ -712,7 +723,7 @@ func (s *Maps) ApplyDeletions() bool {
logCxt := s.logCxt.WithField("mapName", mapName)
logCxt.Info("Deleting map in next transaction.")
tx.Delete(&knftables.Set{Name: mapName})
deletedSets.Add(mapName)
deletedMaps.Add(mapName)

if _, ok := s.mapNameToAllMetadata[mapName]; !ok {
// map is not just filtered out, clean up the members cache.
Expand All @@ -727,8 +738,8 @@ func (s *Maps) ApplyDeletions() bool {
return deltatracker.IterActionNoOp
})

if deletedSets.Len() > 0 {
s.logCxt.WithField("numSets", deletedSets.Len()).Info("Deleting maps.")
if deletedMaps.Len() > 0 {
s.logCxt.WithField("numMaps", deletedMaps.Len()).Info("Deleting maps.")
ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout)
defer cancel()
if err := s.runTransaction(ctx, tx); err != nil {
Expand All @@ -737,9 +748,9 @@ func (s *Maps) ApplyDeletions() bool {
}
}

// We need to clear pending deletions now that we have successfully deleted the sets.
// We need to clear pending deletions now that we have successfully deleted the maps.
s.mapNameToProgrammedMetadata.PendingDeletions().Iter(func(mapName string) deltatracker.IterAction {
if deletedSets.Contains(mapName) {
if deletedMaps.Contains(mapName) {
return deltatracker.IterActionUpdateDataplane
}
return deltatracker.IterActionNoOp
Expand All @@ -751,19 +762,19 @@ func (s *Maps) ApplyDeletions() bool {

// Determine if we need to be rescheduled.
numDeletionsPending := s.mapNameToProgrammedMetadata.PendingDeletions().Len()
if deletedSets.Len() == 0 {
if deletedMaps.Len() == 0 {
// We had nothing to delete, or we only encountered errors, don't
// ask to be rescheduled.
return false
}
return numDeletionsPending > 0 // Reschedule if we have sets left to delete.
return numDeletionsPending > 0 // Reschedule if we have maps left to delete.
}

func (s *Maps) runTransaction(ctx context.Context, tx *knftables.Transaction) error {
countNumSetTransactions.Inc()
countNumMapTransactions.Inc()
err := s.nft.Run(ctx, tx)
if err != nil {
countNumSetErrors.Inc()
countNumMapErrors.Inc()
}
return err
}
Expand Down
26 changes: 26 additions & 0 deletions felix/nftables/members.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,29 @@ func (u unknownMember) Key() []string {
func (u unknownMember) String() string {
return u.concat
}

func UnknownMapMember(k, v []string) MapMember {
logrus.WithField("key", k).Warn("Unknown member type")
return unknownMapMember{
kConcat: strings.Join(k, " . "),
vConcat: strings.Join(v, " . "),
}
}

// unknownMember is a struct that represents a set member that we do not know how to parse.
type unknownMapMember struct {
kConcat string
vConcat string
}

func (u unknownMapMember) Key() []string {
return strings.Split(u.kConcat, " . ")
}

func (u unknownMapMember) Value() []string {
return strings.Split(u.vConcat, " . ")
}

func (u unknownMapMember) String() string {
return u.kConcat + " -> " + u.vConcat
}

0 comments on commit 4ea9bce

Please sign in to comment.