Skip to content

Commit

Permalink
Merge pull request #101 from kubescape/unmarshal
Browse files Browse the repository at this point in the history
refactor locks to prevent race on get list
  • Loading branch information
matthyx authored Mar 5, 2024
2 parents 77bf3fb + 52fed34 commit 13cd52e
Showing 1 changed file with 11 additions and 34 deletions.
45 changes: 11 additions & 34 deletions pkg/registry/file/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
)

const (
clusterKey = "cluster"
JsonExt = ".j"
MetadataExt = ".m"
DefaultStorageRoot = "/data"
Expand Down Expand Up @@ -119,6 +118,11 @@ func isPayloadFile(path string) bool {
return !IsMetadataFile(path)
}

func (s *StorageImpl) keyFromPath(path string) string {
extension := filepath.Ext(path)
return strings.TrimPrefix(strings.TrimSuffix(path, extension), s.root)
}

func (s *StorageImpl) writeFiles(key string, obj runtime.Object, metaOut runtime.Object) error {
// call processor on object to be saved
if err := s.processor.PreSave(obj); err != nil {
Expand Down Expand Up @@ -313,10 +317,6 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, _ storage.ListOpt
ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetList")
span.SetAttributes(attribute.String("key", key))
defer span.End()
_, spanLock := otel.Tracer("").Start(ctx, "waiting for lock")
s.locks.RLock(key)
defer s.locks.RUnlock(key)
spanLock.End()
listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
logger.L().Ctx(ctx).Error("get items ptr failed", helpers.Error(err), helpers.String("key", key))
Expand Down Expand Up @@ -346,20 +346,9 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, _ storage.ListOpt
})
}
for _, path := range files {
// we need to read the whole file
file, err := s.appFs.Open(path)
if err != nil {
// skip if file is not readable, maybe it was deleted
continue
}

obj, err := getUnmarshaledRuntimeObject(v, file)
if err != nil {
logger.L().Ctx(ctx).Error("unmarshal file failed", helpers.Error(err), helpers.String("path", path))
continue
if err := s.appendJSONObjectFromFile(path, v); err != nil {
logger.L().Ctx(ctx).Error("appending JSON object from file failed", helpers.Error(err), helpers.String("path", path))
}
// append to list
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
}
return nil
}
Expand Down Expand Up @@ -541,9 +530,6 @@ func (s *StorageImpl) Count(key string) (int64, error) {
p := filepath.Join(s.root, key)
metadataPath := makeMetadataPath(p)

s.locks.RLock(key)
defer s.locks.RUnlock(key)

pathExists, _ := afero.Exists(s.appFs, metadataPath)
pathIsDir, _ := afero.IsDir(s.appFs, metadataPath)
if pathExists && !pathIsDir {
Expand Down Expand Up @@ -576,10 +562,6 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name
ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetByNamespace")
span.SetAttributes(attribute.String("apiVersion", apiVersion), attribute.String("kind", kind), attribute.String("namespace", namespace))
defer span.End()
_, spanLock := otel.Tracer("").Start(ctx, "waiting for lock")
s.locks.RLock(namespace)
defer s.locks.RUnlock(namespace)
spanLock.End()

listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
Expand All @@ -602,7 +584,7 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name
}

if err := s.appendJSONObjectFromFile(path, v); err != nil {
logger.L().Ctx(ctx).Error("unmarshal file failed", helpers.Error(err), helpers.String("path", path))
logger.L().Ctx(ctx).Error("appending JSON object from file failed", helpers.Error(err), helpers.String("path", path))
}

return nil
Expand All @@ -615,10 +597,6 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name
func (s *StorageImpl) GetClusterScopedResource(ctx context.Context, apiVersion, kind string, listObj runtime.Object) error {
ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetClusterScopedResource")
defer span.End()
_, spanLock := otel.Tracer("").Start(ctx, "waiting for lock")
s.locks.RLock(clusterKey)
defer s.locks.RUnlock(clusterKey)
spanLock.End()

listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
Expand Down Expand Up @@ -661,10 +639,6 @@ func (s *StorageImpl) GetClusterScopedResource(ctx context.Context, apiVersion,
func (s *StorageImpl) GetByCluster(ctx context.Context, apiVersion, kind string, listObj runtime.Object) error {
ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetByCluster")
defer span.End()
_, spanLock := otel.Tracer("").Start(ctx, "waiting for lock")
s.locks.RLock(clusterKey)
defer s.locks.RUnlock(clusterKey)
spanLock.End()

listPtr, err := meta.GetItemsPtr(listObj)
if err != nil {
Expand Down Expand Up @@ -709,6 +683,9 @@ func (s *StorageImpl) GetByCluster(ctx context.Context, apiVersion, kind string,

// appendJSONObjectFromFile unmarshalls a json file into a runtime.Object and appends it to the underlying list object.
func (s *StorageImpl) appendJSONObjectFromFile(path string, v reflect.Value) error {
key := s.keyFromPath(path)
s.locks.RLock(key)
defer s.locks.RUnlock(key)
file, err := s.appFs.Open(path)
if err != nil {
// skip if file is not readable, maybe it was deleted
Expand Down

0 comments on commit 13cd52e

Please sign in to comment.