From 52fed343ae13b63fae88e1fdb35717d2eadaa2a0 Mon Sep 17 00:00:00 2001 From: Matthias Bertschy Date: Tue, 5 Mar 2024 13:14:02 +0100 Subject: [PATCH] refactor locks to prevent race on get list Signed-off-by: Matthias Bertschy --- pkg/registry/file/storage.go | 45 +++++++++--------------------------- 1 file changed, 11 insertions(+), 34 deletions(-) diff --git a/pkg/registry/file/storage.go b/pkg/registry/file/storage.go index 8ee59021c..639988bfd 100644 --- a/pkg/registry/file/storage.go +++ b/pkg/registry/file/storage.go @@ -24,7 +24,6 @@ import ( ) const ( - clusterKey = "cluster" JsonExt = ".j" MetadataExt = ".m" DefaultStorageRoot = "/data" @@ -119,6 +118,11 @@ func isPayloadFile(path string) bool { return !IsMetadataFile(path) } +func (s *StorageImpl) keyFromPath(path string) string { + extension := filepath.Ext(path) + return strings.TrimPrefix(strings.TrimSuffix(path, extension), s.root) +} + func (s *StorageImpl) writeFiles(key string, obj runtime.Object, metaOut runtime.Object) error { // call processor on object to be saved if err := s.processor.PreSave(obj); err != nil { @@ -313,10 +317,6 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, _ storage.ListOpt ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetList") span.SetAttributes(attribute.String("key", key)) defer span.End() - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.locks.RLock(key) - defer s.locks.RUnlock(key) - spanLock.End() listPtr, err := meta.GetItemsPtr(listObj) if err != nil { logger.L().Ctx(ctx).Error("get items ptr failed", helpers.Error(err), helpers.String("key", key)) @@ -346,20 +346,9 @@ func (s *StorageImpl) GetList(ctx context.Context, key string, _ storage.ListOpt }) } for _, path := range files { - // we need to read the whole file - file, err := s.appFs.Open(path) - if err != nil { - // skip if file is not readable, maybe it was deleted - continue - } - - obj, err := getUnmarshaledRuntimeObject(v, file) - if err != nil { - logger.L().Ctx(ctx).Error("unmarshal file failed", helpers.Error(err), helpers.String("path", path)) - continue + if err := s.appendJSONObjectFromFile(path, v); err != nil { + logger.L().Ctx(ctx).Error("appending JSON object from file failed", helpers.Error(err), helpers.String("path", path)) } - // append to list - v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) } return nil } @@ -541,9 +530,6 @@ func (s *StorageImpl) Count(key string) (int64, error) { p := filepath.Join(s.root, key) metadataPath := makeMetadataPath(p) - s.locks.RLock(key) - defer s.locks.RUnlock(key) - pathExists, _ := afero.Exists(s.appFs, metadataPath) pathIsDir, _ := afero.IsDir(s.appFs, metadataPath) if pathExists && !pathIsDir { @@ -576,10 +562,6 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetByNamespace") span.SetAttributes(attribute.String("apiVersion", apiVersion), attribute.String("kind", kind), attribute.String("namespace", namespace)) defer span.End() - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.locks.RLock(namespace) - defer s.locks.RUnlock(namespace) - spanLock.End() listPtr, err := meta.GetItemsPtr(listObj) if err != nil { @@ -602,7 +584,7 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name } if err := s.appendJSONObjectFromFile(path, v); err != nil { - logger.L().Ctx(ctx).Error("unmarshal file failed", helpers.Error(err), helpers.String("path", path)) + logger.L().Ctx(ctx).Error("appending JSON object from file failed", helpers.Error(err), helpers.String("path", path)) } return nil @@ -615,10 +597,6 @@ func (s *StorageImpl) GetByNamespace(ctx context.Context, apiVersion, kind, name func (s *StorageImpl) GetClusterScopedResource(ctx context.Context, apiVersion, kind string, listObj runtime.Object) error { ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetClusterScopedResource") defer span.End() - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.locks.RLock(clusterKey) - defer s.locks.RUnlock(clusterKey) - spanLock.End() listPtr, err := meta.GetItemsPtr(listObj) if err != nil { @@ -661,10 +639,6 @@ func (s *StorageImpl) GetClusterScopedResource(ctx context.Context, apiVersion, func (s *StorageImpl) GetByCluster(ctx context.Context, apiVersion, kind string, listObj runtime.Object) error { ctx, span := otel.Tracer("").Start(ctx, "StorageImpl.GetByCluster") defer span.End() - _, spanLock := otel.Tracer("").Start(ctx, "waiting for lock") - s.locks.RLock(clusterKey) - defer s.locks.RUnlock(clusterKey) - spanLock.End() listPtr, err := meta.GetItemsPtr(listObj) if err != nil { @@ -709,6 +683,9 @@ func (s *StorageImpl) GetByCluster(ctx context.Context, apiVersion, kind string, // appendJSONObjectFromFile unmarshalls a json file into a runtime.Object and appends it to the underlying list object. func (s *StorageImpl) appendJSONObjectFromFile(path string, v reflect.Value) error { + key := s.keyFromPath(path) + s.locks.RLock(key) + defer s.locks.RUnlock(key) file, err := s.appFs.Open(path) if err != nil { // skip if file is not readable, maybe it was deleted