Skip to content

Commit

Permalink
Merge pull request #133 from kubescape/resourceversion
Browse files Browse the repository at this point in the history
increment resourceVersion on successful updates
  • Loading branch information
matthyx authored Aug 21, 2024
2 parents 420e03b + fe00e15 commit 925c321
Show file tree
Hide file tree
Showing 10 changed files with 100 additions and 47 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/anchore/syft v1.3.0
github.com/armosec/utils-k8s-go v0.0.26
github.com/containers/common v0.59.0
github.com/deckarep/golang-set/v2 v2.5.0
github.com/deckarep/golang-set/v2 v2.6.0
github.com/go-logr/zapr v1.2.4
github.com/google/gofuzz v1.2.0
github.com/goradd/maps v0.1.5
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -723,8 +723,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/deckarep/golang-set/v2 v2.5.0 h1:hn6cEZtQ0h3J8kFrHR/NrzyOoTnjgW1+FmNJzQ7y/sA=
github.com/deckarep/golang-set/v2 v2.5.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/deckarep/golang-set/v2 v2.6.0 h1:XfcQbWM1LlMB8BsJ8N9vW5ehnnPVIw0je80NsVHagjM=
github.com/deckarep/golang-set/v2 v2.6.0/go.mod h1:VAky9rY/yGXJOLEDv3OMci+7wtDpOF4IN+y82NBOac4=
github.com/dgrijalva/jwt-go/v4 v4.0.0-preview1/go.mod h1:+hnT3ywWDTAFrW5aE+u2Sa/wT555ZqwoCS+pk3p6ry4=
github.com/docker/cli v24.0.0+incompatible h1:0+1VshNwBQzQAx9lOl+OYCTCEAD8fKs/qeXMx3O0wqM=
github.com/docker/cli v24.0.0+incompatible/go.mod h1:JLrzqnKDaYBop7H2jaqPtU4hHvMKP+vjCwu2uszcLI8=
Expand Down
8 changes: 8 additions & 0 deletions pkg/cleanup/cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ func (h *ResourcesCleanupHandler) StartCleanupTask() {
continue
}

var size int64
for resourceKind, handler := range resourceKindToHandler {
v1beta1ApiVersionPath := filepath.Join(h.root, softwarecomposition.GroupName, resourceKind)
exists, _ := afero.DirExists(h.appFs, v1beta1ApiVersionPath)
Expand All @@ -87,6 +88,11 @@ func (h *ResourcesCleanupHandler) StartCleanupTask() {
return err
}

// sum all files in path
if !info.IsDir() {
size += info.Size()
}

// FIXME: migrate to gob files - to remove after some time
if strings.HasSuffix(path, file.JsonExt) {
switch resourceKind {
Expand Down Expand Up @@ -151,6 +157,8 @@ func (h *ResourcesCleanupHandler) StartCleanupTask() {
}
}

logger.L().Info("storage size before cleanup", helpers.String("path", h.root), helpers.String("size", fmt.Sprintf("%d bytes", size)))

if h.interval == 0 {
break
}
Expand Down
10 changes: 5 additions & 5 deletions pkg/cleanup/cleanup_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

_ "embed"

sets "github.com/deckarep/golang-set/v2"
mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
"github.com/kubescape/storage/pkg/registry/file"
"github.com/spf13/afero"
Expand Down Expand Up @@ -239,9 +239,9 @@ var _ ResourcesFetcher = (*ResourcesFetchMock)(nil)

func (r *ResourcesFetchMock) FetchResources() (ResourceMaps, error) {
resourceMaps := ResourceMaps{
RunningInstanceIds: sets.NewSet[string](),
RunningContainerImageIds: sets.NewSet[string](),
RunningWlidsToContainerNames: new(maps.SafeMap[string, sets.Set[string]]),
RunningInstanceIds: mapset.NewSet[string](),
RunningContainerImageIds: mapset.NewSet[string](),
RunningWlidsToContainerNames: new(maps.SafeMap[string, mapset.Set[string]]),
}

var expectedImageIds []string
Expand All @@ -261,7 +261,7 @@ func (r *ResourcesFetchMock) FetchResources() (ResourceMaps, error) {
panic(err)
}
for wlid, containerNames := range expectedWlids {
resourceMaps.RunningWlidsToContainerNames.Set(wlid, sets.NewSet(containerNames...))
resourceMaps.RunningWlidsToContainerNames.Set(wlid, mapset.NewSet(containerNames...))
}

return resourceMaps, nil
Expand Down
22 changes: 11 additions & 11 deletions pkg/cleanup/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"k8s.io/client-go/discovery"

wlidPkg "github.com/armosec/utils-k8s-go/wlid"
sets "github.com/deckarep/golang-set/v2"
mapset "github.com/deckarep/golang-set/v2"
"github.com/goradd/maps"
"github.com/kubescape/k8s-interface/instanceidhandler/v1"
"github.com/kubescape/k8s-interface/k8sinterface"
Expand All @@ -21,7 +21,7 @@ import (
)

var (
Workloads = sets.NewSet[string]([]string{
Workloads = mapset.NewSet[string]([]string{
"apiservice",
"configmap",
"clusterrole",
Expand Down Expand Up @@ -65,19 +65,19 @@ var _ ResourcesFetcher = (*KubernetesAPI)(nil)

// ResourceMaps is a map of running resources in the cluster, based on these maps we can decide which files to delete
type ResourceMaps struct {
RunningWlidsToContainerNames *maps.SafeMap[string, sets.Set[string]]
RunningInstanceIds sets.Set[string]
RunningContainerImageIds sets.Set[string]
RunningTemplateHash sets.Set[string]
RunningWlidsToContainerNames *maps.SafeMap[string, mapset.Set[string]]
RunningInstanceIds mapset.Set[string]
RunningContainerImageIds mapset.Set[string]
RunningTemplateHash mapset.Set[string]
}

// builds a map of running resources in the cluster needed for cleanup
func (h *KubernetesAPI) FetchResources() (ResourceMaps, error) {
resourceMaps := ResourceMaps{
RunningInstanceIds: sets.NewSet[string](),
RunningContainerImageIds: sets.NewSet[string](),
RunningTemplateHash: sets.NewSet[string](),
RunningWlidsToContainerNames: new(maps.SafeMap[string, sets.Set[string]]),
RunningInstanceIds: mapset.NewSet[string](),
RunningContainerImageIds: mapset.NewSet[string](),
RunningTemplateHash: mapset.NewSet[string](),
RunningWlidsToContainerNames: new(maps.SafeMap[string, mapset.Set[string]]),
}

if err := h.fetchInstanceIdsAndImageIdsAndReplicasFromRunningPods(&resourceMaps); err != nil {
Expand Down Expand Up @@ -107,7 +107,7 @@ func (h *KubernetesAPI) fetchWlidsFromRunningWorkloads(resourceMaps *ResourceMap
wlid := wlidPkg.GetK8sWLID("", workload.GetNamespace(), workload.GetKind(), workload.GetName())
wlid = wlidWithoutClusterName(wlid)

resourceMaps.RunningWlidsToContainerNames.Set(wlid, sets.NewSet[string]())
resourceMaps.RunningWlidsToContainerNames.Set(wlid, mapset.NewSet[string]())

c, ok := workloadinterface.InspectMap(workload.Object, append(workloadinterface.PodSpec(workload.GetKind()), "containers")...)
if !ok {
Expand Down
8 changes: 4 additions & 4 deletions pkg/registry/file/applicationprofile_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"strconv"

sets "github.com/deckarep/golang-set/v2"
mapset "github.com/deckarep/golang-set/v2"
"github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/storage/pkg/apis/softwarecomposition"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -39,7 +39,7 @@ func (a ApplicationProfileProcessor) PreSave(object runtime.Object) error {
profile.Spec.InitContainers = processContainers(profile.Spec.InitContainers)
profile.Spec.Containers = processContainers(profile.Spec.Containers)

profile.Spec.Architectures = sets.NewThreadUnsafeSet(profile.Spec.Architectures...).ToSlice()
profile.Spec.Architectures = mapset.Sorted(mapset.NewThreadUnsafeSet(profile.Spec.Architectures...))

profile.Annotations[helpers.ResourceSizeMetadataKey] = strconv.Itoa(size)
return nil
Expand All @@ -48,10 +48,10 @@ func (a ApplicationProfileProcessor) PreSave(object runtime.Object) error {
func deflateApplicationProfileContainer(container softwarecomposition.ApplicationProfileContainer) softwarecomposition.ApplicationProfileContainer {
return softwarecomposition.ApplicationProfileContainer{
Name: container.Name,
Capabilities: sets.NewThreadUnsafeSet(container.Capabilities...).ToSlice(),
Capabilities: mapset.Sorted(mapset.NewThreadUnsafeSet(container.Capabilities...)),
Execs: deflateStringer(container.Execs),
Opens: deflateStringer(container.Opens),
Syscalls: sets.NewThreadUnsafeSet(container.Syscalls...).ToSlice(),
Syscalls: mapset.Sorted(mapset.NewThreadUnsafeSet(container.Syscalls...)),
SeccompProfile: container.SeccompProfile,
}
}
8 changes: 4 additions & 4 deletions pkg/registry/file/networkneighborhood_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"fmt"
"strconv"

sets "github.com/deckarep/golang-set/v2"
mapset "github.com/deckarep/golang-set/v2"
"github.com/kubescape/k8s-interface/instanceidhandler/v1/helpers"
"github.com/kubescape/storage/pkg/apis/softwarecomposition"
"k8s.io/apimachinery/pkg/runtime"
Expand Down Expand Up @@ -57,7 +57,7 @@ func deflateNetworkNeighborhoodContainer(container softwarecomposition.NetworkNe
func deflateNetworkNeighbors(in []softwarecomposition.NetworkNeighbor) []softwarecomposition.NetworkNeighbor {
out := make([]softwarecomposition.NetworkNeighbor, 0)
seen := map[string]int{}
toDeflate := sets.NewThreadUnsafeSet[int]()
toDeflate := mapset.NewThreadUnsafeSet[int]()
for _, item := range in {
if index, ok := seen[item.Identifier]; ok {
out[index].DNSNames = append(out[index].DNSNames, item.DNSNames...)
Expand All @@ -68,8 +68,8 @@ func deflateNetworkNeighbors(in []softwarecomposition.NetworkNeighbor) []softwar
seen[item.Identifier] = len(out) - 1 // index of the appended item
}
}
for _, i := range toDeflate.ToSlice() {
out[i].DNSNames = sets.NewThreadUnsafeSet(out[i].DNSNames...).ToSlice()
for _, i := range mapset.Sorted(toDeflate) {
out[i].DNSNames = mapset.Sorted(mapset.NewThreadUnsafeSet(out[i].DNSNames...))
out[i].Ports = deflateStringer(out[i].Ports)
}
return out
Expand Down
4 changes: 2 additions & 2 deletions pkg/registry/file/processor.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package file

import (
sets "github.com/deckarep/golang-set/v2"
mapset "github.com/deckarep/golang-set/v2"
"k8s.io/apimachinery/pkg/runtime"
)

Expand All @@ -24,7 +24,7 @@ type Stringer interface {

func deflateStringer[T Stringer](in []T) []T {
out := make([]T, 0)
set := sets.NewThreadUnsafeSet[string]()
set := mapset.NewThreadUnsafeSet[string]()
for _, item := range in {
if set.Contains(item.String()) {
continue
Expand Down
32 changes: 23 additions & 9 deletions pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,13 +122,9 @@ func (s *StorageImpl) keyFromPath(path string) string {
}

func (s *StorageImpl) writeFiles(key string, obj runtime.Object, metaOut runtime.Object) error {
// call processor on object to be saved
if err := s.processor.PreSave(obj); err != nil {
return fmt.Errorf("processor.PreSave: %w", err)
}
// set resourceversion
if version, _ := s.versioner.ObjectResourceVersion(obj); version == 0 {
if err := s.versioner.UpdateObject(obj, 1); err != nil {
// increment resourceVersion
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil {
if err := s.versioner.UpdateObject(obj, version+1); err != nil {
return fmt.Errorf("set resourceVersion: %w", err)
}
}
Expand Down Expand Up @@ -188,18 +184,21 @@ func (s *StorageImpl) Create(ctx context.Context, key string, obj, metaOut runti
s.locks.Lock(key)
defer s.locks.Unlock(key)
spanLock.End()
// resourceversion should not be set on create
// resourceVersion should not be set on create
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
msg := "resourceVersion should not be set on objects to be created"
logger.L().Ctx(ctx).Error(msg)
return errors.New(msg)
}
// call processor on object to be saved
if err := s.processor.PreSave(obj); err != nil {
return fmt.Errorf("processor.PreSave: %w", err)
}
// write files
if err := s.writeFiles(key, obj, metaOut); err != nil {
logger.L().Ctx(ctx).Error("write files failed", helpers.Error(err), helpers.String("key", key))
return err
}

// publish event to watchers
s.watchDispatcher.Added(key, metaOut)
return nil
Expand Down Expand Up @@ -522,6 +521,21 @@ func (s *StorageImpl) GuaranteedUpdate(
continue
}

// call processor on object to be saved
if err := s.processor.PreSave(ret); err != nil {
return fmt.Errorf("processor.PreSave: %w", err)
}

// check if the object is the same as the original
orig := origState.obj.DeepCopyObject() // FIXME this is expensive
_ = s.processor.PreSave(orig)
if reflect.DeepEqual(orig, ret) {
logger.L().Debug("tryUpdate returned the same object, no update needed", helpers.String("key", key))
// no change, return the original object
v.Set(reflect.ValueOf(origState.obj).Elem())
return nil
}

// save to disk and fill into metaOut
err = s.writeFiles(key, ret, metaOut)
if err == nil {
Expand Down
49 changes: 40 additions & 9 deletions pkg/registry/file/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,19 @@ func TestStorageImpl_GuaranteedUpdate(t *testing.T) {
},
},
}
tutu := &v1beta1.SBOMSPDXv2p3{
ObjectMeta: v1.ObjectMeta{
Name: "toto",
ResourceVersion: "1",
},
Spec: v1beta1.SBOMSPDXv2p3Spec{
Metadata: v1beta1.SPDXMeta{
Tool: v1beta1.ToolMeta{
Name: "tutu",
},
},
},
}
type args struct {
key string
ignoreNotFound bool
Expand All @@ -466,11 +479,12 @@ func TestStorageImpl_GuaranteedUpdate(t *testing.T) {
cachedExistingObject runtime.Object
}
tests := []struct {
name string
args args
create bool
wantErr bool
want *v1beta1.SBOMSPDXv2p3
name string
args args
create bool
wantErr bool
wantNotFound bool
want *v1beta1.SBOMSPDXv2p3
}{
{
name: "test",
Expand All @@ -485,14 +499,27 @@ func TestStorageImpl_GuaranteedUpdate(t *testing.T) {
},
{
name: "test with existing object",
args: args{
key: "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape/toto",
tryUpdate: func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
obj := *input.(*v1beta1.SBOMSPDXv2p3)
obj.Spec.Metadata.Tool.Name = "tutu"
return &obj, nil, nil
},
cachedExistingObject: toto.DeepCopyObject(),
},
want: tutu,
},
{
name: "test with existing object, no change",
args: args{
key: "/spdx.softwarecomposition.kubescape.io/sbomspdxv2p3s/kubescape/toto",
tryUpdate: func(input runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) {
return input, nil, nil
},
cachedExistingObject: toto.DeepCopyObject(),
},
want: totov1,
wantNotFound: true, // no change, not found because we don't call writeFiles
},
{
name: "test with failing precondition",
Expand All @@ -516,7 +543,7 @@ func TestStorageImpl_GuaranteedUpdate(t *testing.T) {
return nil, nil, fmt.Errorf("tryUpdate error")
}
obj := *input.(*v1beta1.SBOMSPDXv2p3)
obj.ResourceVersion = "3"
obj.ResourceVersion = "2"
obj.Spec.Metadata.Tool.Name = "tutu"
return &obj, nil, nil
},
Expand All @@ -543,8 +570,12 @@ func TestStorageImpl_GuaranteedUpdate(t *testing.T) {
} else {
onDisk := &v1beta1.SBOMSPDXv2p3{}
err = s.Get(context.Background(), tt.args.key, storage.GetOptions{}, onDisk)
assert.NoError(t, err)
assert.Equal(t, tt.want, onDisk)
if tt.wantNotFound {
assert.Error(t, err)
} else {
assert.NoError(t, err)
assert.Equal(t, tt.want, onDisk)
}
}
})
}
Expand Down

0 comments on commit 925c321

Please sign in to comment.