diff --git a/geonode/base/api/permissions.py b/geonode/base/api/permissions.py
index c5c36534234..878260d55a1 100644
--- a/geonode/base/api/permissions.py
+++ b/geonode/base/api/permissions.py
@@ -35,8 +35,7 @@
from geonode.groups.models import GroupProfile
from rest_framework.permissions import DjangoModelPermissions
from guardian.shortcuts import get_objects_for_user
-from itertools import chain
-from guardian.shortcuts import get_groups_with_perms
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -251,19 +250,10 @@ def has_permission(self, request, view):
)
# getting the user permission for that resource
- resource_perms = res.get_user_perms(request.user)
-
- groups = get_groups_with_perms(res, attach_perms=True)
- # we are making this because the request.user.groups sometimes returns empty si is not fully reliable
- for group, perm in groups.items():
- # checking if the user is in that group
- if group.user_set.filter(username=request.user).exists():
- resource_perms = list(chain(resource_perms, perm))
+ available_perms = permissions_registry.get_perms(instance=res, user=request.user)
if request.user.has_perm("base.add_resourcebase"):
- resource_perms.append("add_resourcebase")
- # merging all available permissions into a single list
- available_perms = list(set(resource_perms))
+ available_perms.append("add_resourcebase")
# fixup the permissions name
perms_without_base = [x.replace("base.", "") for x in perms]
# if at least one of the permissions is available the request is True
diff --git a/geonode/base/api/serializers.py b/geonode/base/api/serializers.py
index ac136d33bab..27a8a122a2e 100644
--- a/geonode/base/api/serializers.py
+++ b/geonode/base/api/serializers.py
@@ -69,6 +69,7 @@
from geonode.security.utils import get_resources_with_perms, get_geoapp_subtypes
from geonode.resource.models import ExecutionRequest
from django.contrib.gis.geos import Polygon
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -523,7 +524,11 @@ class Meta:
def to_representation(self, instance):
request = self.context.get("request", None)
resource = ResourceBase.objects.get(pk=instance)
- return resource.get_user_perms(request.user) if request and request.user and resource else []
+ return (
+ permissions_registry.get_perms(instance=resource, user=request.user)
+ if request and request.user and resource
+ else []
+ )
class LinksSerializer(DynamicModelSerializer):
diff --git a/geonode/base/api/tests.py b/geonode/base/api/tests.py
index e318fd06e61..009acd52d7e 100644
--- a/geonode/base/api/tests.py
+++ b/geonode/base/api/tests.py
@@ -93,6 +93,7 @@
)
from geonode.resource.api.tasks import resouce_service_dispatcher
from guardian.shortcuts import assign_perm
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -2373,7 +2374,11 @@ def test_resource_service_copy_with_perms_dataset_set_default_perms(self):
}
resource.set_permissions(_perms)
# checking that bobby is in the original dataset perms list
- self.assertTrue("bobby" in "bobby" in [x.username for x in resource.get_all_level_info().get("users", [])])
+ self.assertTrue(
+ "bobby"
+ in "bobby"
+ in [x.username for x in permissions_registry.get_perms(instance=resource).get("users", [])]
+ )
# copying the resource, should remove the perms for bobby
# only the default perms should be available
copy_url = reverse("importer_resource_copy", kwargs={"pk": resource.pk})
@@ -2388,8 +2393,14 @@ def test_resource_service_copy_with_perms_dataset_set_default_perms(self):
self.assertEqual("finished", self.client.get(response.json().get("status_url")).json().get("status"))
_resource = Dataset.objects.filter(title__icontains="test_copy_with_perms").last()
self.assertIsNotNone(_resource)
- self.assertNotIn("bobby", [x.username for x in _resource.get_all_level_info().get("users", [])])
- self.assertIn("admin", [x.username for x in _resource.get_all_level_info().get("users", [])])
+ self.assertNotIn(
+ "bobby",
+ [x.username for x in permissions_registry.get_perms(instance=_resource).get("users", [])],
+ )
+ self.assertIn(
+ "admin",
+ [x.username for x in permissions_registry.get_perms(instance=_resource).get("users", [])],
+ )
def test_resource_service_copy_with_perms_doc(self):
files = os.path.join(gisdata.GOOD_DATA, "vector/single_point.shp")
@@ -3427,7 +3438,7 @@ def test_simple_resourcebase_can_be_created_by_resourcemanager(self):
"groups": {anonymous_group: set(["view_resourcebase"])},
}
- actual_perms = resource.get_all_level_info().copy()
+ actual_perms = permissions_registry.get_perms(instance=resource).copy()
self.assertIsNotNone(actual_perms)
self.assertTrue(self.user in actual_perms["users"].keys())
self.assertTrue(anonymous_group in actual_perms["groups"].keys())
diff --git a/geonode/base/views.py b/geonode/base/views.py
index b08ca239a68..274ecb1af52 100644
--- a/geonode/base/views.py
+++ b/geonode/base/views.py
@@ -72,6 +72,7 @@
from geonode.base.forms import CategoryForm, TKeywordForm, ThesaurusAvailableForm
from geonode.base.models import Thesaurus, TopicCategory
+from geonode.security.registry import permissions_registry
from .forms import ResourceBaseForm
@@ -509,8 +510,7 @@ def resourcebase_embed(request, resourcebaseid, template="base/base_edit.html"):
# Call this first in order to be sure "perms_list" is correct
permissions_json = _perms_info_json(resourcebase_obj)
-
- perms_list = resourcebase_obj.get_user_perms(request.user)
+ perms_list = permissions_registry.get_perms(instance=resourcebase_obj, user=request.user)
group = None
if resourcebase_obj.group:
diff --git a/geonode/documents/tests.py b/geonode/documents/tests.py
index 69a82b78903..4de6e8ffd46 100644
--- a/geonode/documents/tests.py
+++ b/geonode/documents/tests.py
@@ -60,6 +60,7 @@
from geonode.upload.api.exceptions import FileUploadLimitException
from .forms import DocumentCreateForm
+from geonode.security.registry import permissions_registry
TEST_GIF = os.path.join(os.path.dirname(__file__), "tests/data/img.gif")
@@ -401,8 +402,8 @@ def test_set_document_permissions(self):
self.assertFalse(self.anonymous_user.has_perm("view_resourcebase", document.get_self_resource()))
# Test that previous permissions for users other than ones specified in
- # the perm_spec (and the document owner) were removed
- current_perms = document.get_all_level_info()
+ # the perm_spec (and the document owner) were
+ current_perms = permissions_registry.get_perms(instance=document)
self.assertEqual(len(current_perms["users"]), 1)
# Test that the User permissions specified in the perm_spec were
diff --git a/geonode/geoapps/views.py b/geonode/geoapps/views.py
index a20064f01b8..2d9277a24f0 100644
--- a/geonode/geoapps/views.py
+++ b/geonode/geoapps/views.py
@@ -46,6 +46,7 @@
from geonode.base.forms import CategoryForm, TKeywordForm, ThesaurusAvailableForm
from geonode.base.models import Thesaurus, TopicCategory
from geonode.utils import resolve_object
+from geonode.security.registry import permissions_registry
from .forms import GeoAppForm
@@ -106,7 +107,7 @@ def geoapp_edit(request, geoappid, template="apps/app_edit.html"):
# Call this first in order to be sure "perms_list" is correct
permissions_json = _perms_info_json(geoapp_obj)
- perms_list = geoapp_obj.get_user_perms(request.user)
+ perms_list = permissions_registry.get_perms(instance=geoapp_obj, user=request.user)
group = None
if geoapp_obj.group:
diff --git a/geonode/geoserver/security.py b/geonode/geoserver/security.py
index 4cb244e67e3..5cc93e77b93 100644
--- a/geonode/geoserver/security.py
+++ b/geonode/geoserver/security.py
@@ -37,6 +37,7 @@
from geonode.geoserver.helpers import geofence, gf_utils, gs_catalog
from geonode.groups.models import GroupProfile
from geonode.utils import get_dataset_workspace
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -349,8 +350,7 @@ def sync_resources_with_guardian(resource=None, force=False):
batch = AutoPriorityBatch(gf_utils.get_first_available_priority(), f"Sync resources {dataset}")
gf_utils.collect_delete_layer_rules(get_dataset_workspace(dataset), dataset.name, batch)
-
- perm_spec = dataset.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=dataset)
# All the other users
if "users" in perm_spec:
for user, perms in perm_spec["users"].items():
diff --git a/geonode/geoserver/tests/integration.py b/geonode/geoserver/tests/integration.py
index 39d8040e80f..06cb8a43e9a 100644
--- a/geonode/geoserver/tests/integration.py
+++ b/geonode/geoserver/tests/integration.py
@@ -42,6 +42,7 @@
from geonode.decorators import on_ogc_backend
from geonode.base.models import TopicCategory, Link
from geonode.geoserver.helpers import set_attributes_from_geoserver
+from geonode.security.registry import permissions_registry
LOCAL_TIMEOUT = 300
@@ -351,4 +352,5 @@ def test_default_anonymous_permissions(self):
saved_dataset.delete()
def get_user_resource_perms(self, instance, user):
- return list(instance.get_user_perms(user).union(instance.get_self_resource().get_user_perms(user)))
+ return permissions_registry.get_perms(instance=instance, user=user)
+ # return list(instance.get_user_perms(user).union(instance.get_self_resource().get_user_perms(user)))
diff --git a/geonode/groups/tests.py b/geonode/groups/tests.py
index 3b84941bc4f..5366ab515f1 100644
--- a/geonode/groups/tests.py
+++ b/geonode/groups/tests.py
@@ -37,6 +37,7 @@
from geonode.groups.models import GroupProfile, GroupMember, GroupCategory
from geonode.base.populate_test_data import all_public, create_models, remove_models, create_single_dataset
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -334,7 +335,7 @@ def test_perms_info(self):
# Add test to test perms being sent to the front end.
layer = Dataset.objects.first()
layer.set_default_permissions()
- perms_info = layer.get_all_level_info()
+ perms_info = permissions_registry.get_perms(instance=layer)
# Ensure there is only one group 'anonymous' by default
self.assertEqual(len(perms_info["groups"].keys()), 1)
@@ -695,7 +696,7 @@ def test_group_activity_pages_render(self):
try:
# Add test to test perms being sent to the front end.
dataset.set_default_permissions()
- perms_info = dataset.get_all_level_info()
+ perms_info = permissions_registry.get_perms(instance=dataset)
# Ensure there is only one group 'anonymous' by default
self.assertEqual(len(perms_info["groups"].keys()), 1)
diff --git a/geonode/layers/tests.py b/geonode/layers/tests.py
index d11ad69ec5f..d0a9d304227 100644
--- a/geonode/layers/tests.py
+++ b/geonode/layers/tests.py
@@ -76,6 +76,7 @@
from geonode.base.populate_test_data import all_public, create_models, remove_models, create_single_dataset
from geonode.layers.download_handler import DatasetDownloadHandler
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -641,7 +642,7 @@ def test_assign_change_dataset_data_perm(self):
layer = Dataset.objects.first()
user = get_anonymous_user()
layer.set_permissions({"users": {user.username: ["change_dataset_data"]}})
- perms = layer.get_all_level_info()
+ perms = permissions_registry.get_perms(instance=layer)
self.assertNotIn(user, perms["users"])
self.assertNotIn(user.username, perms["users"])
@@ -736,13 +737,13 @@ def test_surrogate_escape_string(self):
def test_assign_remove_permissions(self):
# Assing
layer = Dataset.objects.all().first()
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
self.assertNotIn(get_user_model().objects.get(username="norman"), perm_spec["users"])
utils.set_datasets_permissions(
"edit", resources_names=[layer.name], users_usernames=["norman"], delete_flag=False, verbose=True
)
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
_c = 0
if "users" in perm_spec:
for _u in perm_spec["users"]:
@@ -755,7 +756,7 @@ def test_assign_remove_permissions(self):
utils.set_datasets_permissions(
"read", resources_names=[layer.name], users_usernames=["norman"], delete_flag=True, verbose=True
)
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
_c = 0
if "users" in perm_spec:
for _u in perm_spec["users"]:
@@ -768,7 +769,7 @@ def test_assign_remove_permissions(self):
def test_assign_remove_permissions_for_groups(self):
# Assing
layer = Dataset.objects.all().first()
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
group_profile = GroupProfile.objects.create(slug="group1", title="group1", access="public")
self.assertNotIn(group_profile, perm_spec["groups"])
@@ -776,7 +777,7 @@ def test_assign_remove_permissions_for_groups(self):
utils.set_datasets_permissions(
"manage", resources_names=[layer.name], groups_names=["group1"], delete_flag=False, verbose=True
)
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
expected = {
"change_dataset_data",
"change_dataset_style",
@@ -795,7 +796,7 @@ def test_assign_remove_permissions_for_groups(self):
utils.set_datasets_permissions(
"view", resources_names=[layer.name], groups_names=["group1"], delete_flag=False, verbose=True
)
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
expected = {"view_resourcebase"}
# checking the perms list
self.assertSetEqual(expected, set(perm_spec["groups"][group_profile.group]))
@@ -804,7 +805,7 @@ def test_assign_remove_permissions_for_groups(self):
utils.set_datasets_permissions(
"view", resources_names=[layer.name], groups_names=["group1"], delete_flag=True, verbose=True
)
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
# checking the perms list
self.assertTrue(group_profile.group not in perm_spec["groups"])
@@ -1815,7 +1816,7 @@ def _create_arguments(self, perms_type, mode="set"):
def _assert_perms(self, expected_perms, dataset, username, assertion=True):
dataset.refresh_from_db()
- perms = dataset.get_all_level_info()
+ perms = permissions_registry.get_perms(instance=dataset)
if assertion:
self.assertTrue(username in [user.username for user in perms["users"]])
actual = set(
diff --git a/geonode/layers/utils.py b/geonode/layers/utils.py
index 2984614361c..ad8d4dd6603 100644
--- a/geonode/layers/utils.py
+++ b/geonode/layers/utils.py
@@ -48,6 +48,7 @@
from geonode.utils import check_ogc_backend
from geonode import GeoNodeException, geoserver
from geonode.layers.models import shp_exts, csv_exts, vec_exts, cov_exts, Dataset
+from geonode.security.registry import permissions_registry
READ_PERMISSIONS = ["view_resourcebase"]
WRITE_PERMISSIONS = ["change_dataset_data", "change_dataset_style", "change_resourcebase_metadata"]
@@ -357,7 +358,9 @@ def set_datasets_permissions(
# for the selected resource
resource = resource.first()
# getting the actual permissions available for the dataset
- original_perms = PermSpec(resource.get_all_level_info(), resource)
+ original_perms = PermSpec(
+ permissions_registry.get_perms(instance=resource, include_virtual=False), resource
+ )
new_perms_payload = {"organizations": [], "users": [], "groups": []}
# if the username is specified, we add them to the payload with the compact
# perm value
diff --git a/geonode/layers/views.py b/geonode/layers/views.py
index f7907326936..90efbec41ae 100644
--- a/geonode/layers/views.py
+++ b/geonode/layers/views.py
@@ -61,6 +61,7 @@
from geonode.people.forms import ProfileForm
from geonode.utils import check_ogc_backend, llbbox_to_mercator, resolve_object
from geonode.geoserver.helpers import ogc_server_settings
+from geonode.security.registry import permissions_registry
if check_ogc_backend(geoserver.BACKEND_PACKAGE):
from geonode.geoserver.helpers import gs_catalog
@@ -646,7 +647,7 @@ def dataset_metadata_detail(request, layername, template="datasets/dataset_metad
site_url = settings.SITEURL.rstrip("/") if settings.SITEURL.startswith("http") else settings.SITEURL
register_event(request, "view_metadata", layer)
- perms_list = layer.get_user_perms(request.user)
+ perms_list = permissions_registry.get_perms(instance=layer, user=request.user)
return render(
request,
diff --git a/geonode/people/tests.py b/geonode/people/tests.py
index 72e13207e0b..46e4047c8b7 100644
--- a/geonode/people/tests.py
+++ b/geonode/people/tests.py
@@ -37,6 +37,7 @@
from geonode.base.populate_test_data import all_public, create_models, remove_models
from django.db.models import Q
+from geonode.security.registry import permissions_registry
class PeopleAndProfileTests(GeoNodeBaseTestSupport):
@@ -133,7 +134,7 @@ def test_set_unset_user_dataset_permissions(self):
)
for layer in self.layers:
user = get_user_model().objects.first()
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
self.assertFalse(user in perm_spec["users"], f"{layer} - {user}")
@override_settings(ASYNC_SIGNALS=False)
@@ -166,7 +167,7 @@ def test_set_unset_group_dataset_permissions(self):
verbose=True,
)
for layer in self.layers:
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
self.assertTrue(self.groups[0] in perm_spec["groups"])
@override_settings(ASYNC_SIGNALS=False)
@@ -214,7 +215,7 @@ def test_unset_group_dataset_perms(self):
verbose=True,
)
for layer in self.layers:
- perm_spec = layer.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=layer)
self.assertTrue(user not in perm_spec["users"])
def test_forgot_username(self):
diff --git a/geonode/resource/manager.py b/geonode/resource/manager.py
index f5322a46c39..e2bb8cb2d91 100644
--- a/geonode/resource/manager.py
+++ b/geonode/resource/manager.py
@@ -45,7 +45,12 @@
from geonode.thumbs.utils import ThumbnailAlgorithms
from geonode.documents.tasks import create_document_thumbnail
from geonode.security.permissions import PermSpecCompact, DATA_STYLABLE_RESOURCES_SUBTYPES
-from geonode.security.utils import perms_as_set, get_user_groups, skip_registered_members_common_group
+from geonode.security.utils import (
+ perms_as_set,
+ get_user_groups,
+ skip_registered_members_common_group,
+)
+from geonode.security.registry import permissions_registry
from . import settings as rm_settings
from .utils import update_resource, resourcebase_post_save
@@ -564,7 +569,7 @@ def set_permissions(
# Gathering and validating the current permissions (if any has been passed)
if not created and permissions is None:
- permissions = _resource.get_all_level_info()
+ permissions = permissions_registry.get_perms(instance=_resource, include_virtual=False)
if permissions:
if PermSpecCompact.validate(permissions):
@@ -574,14 +579,16 @@ def set_permissions(
else:
_permissions = None
- # Fixup Advanced Workflow permissions
- _perm_spec = AdvancedSecurityWorkflowManager.get_permissions(
- _resource.uuid,
- instance=_resource,
- permissions=_permissions,
+ """
+ Align _perm_spec based on the permissions handlers
+ """
+ _perm_spec = permissions_registry.fixup_perms(
+ _resource,
+ _permissions,
created=created,
approval_status_changed=approval_status_changed,
group_status_changed=group_status_changed,
+ include_virtual=False,
)
"""
@@ -794,7 +801,7 @@ def _safe_assign_perm(perm, user_or_group, obj=None):
uuid,
instance=_resource,
owner=owner,
- permissions=_resource.get_all_level_info(),
+ permissions=permissions_registry.get_perms(instance=_resource),
created=created,
):
# This might not be a severe error. E.g. for datasets outside of local GeoServer
diff --git a/geonode/security/apps.py b/geonode/security/apps.py
index 3475f4d05f1..a5646d72bd0 100644
--- a/geonode/security/apps.py
+++ b/geonode/security/apps.py
@@ -22,3 +22,9 @@
class GeoNodeSecurityAppConfig(AppConfig):
name = "geonode.security"
verbose_name = "GeoNode Security"
+
+ def ready(self):
+ super().ready()
+ from geonode.security.registry import permissions_registry
+
+ permissions_registry.init_registry()
diff --git a/geonode/security/handlers.py b/geonode/security/handlers.py
new file mode 100644
index 00000000000..d1cb459282a
--- /dev/null
+++ b/geonode/security/handlers.py
@@ -0,0 +1,66 @@
+#########################################################################
+#
+# Copyright (C) 2024 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+from abc import ABC
+
+from geonode.security.utils import AdvancedSecurityWorkflowManager
+
+
+class BasePermissionsHandler(ABC):
+ """
+ Abstract permissions handler.
+ This is the base class, all the permissions instances should
+ inherit from this class.
+ All the flows that touches the permissions will use this class
+ (example advanced workflow)
+ """
+
+ def __str__(self):
+ return f"{self.__module__}.{self.__class__.__name__}"
+
+ def __repr__(self):
+ return self.__str__()
+
+ @staticmethod
+ def fixup_perms(instance, perms_payload, include_virtual, *args, **kwargs):
+ return perms_payload
+
+ @staticmethod
+ def get_perms(instance, perms_payload, user, include_virtual, *args, **kwargs):
+ """
+ By default we dont provide any additional perms
+ """
+ return perms_payload
+
+
+class AdvancedWorkflowPermissionsHandler(BasePermissionsHandler):
+ """
+ Handler that takes care of adjusting the permissions for the advanced workflow
+ """
+
+ @staticmethod
+ def fixup_perms(instance, perms_payload, include_virtual=True, *args, **kwargs):
+ # Fixup Advanced Workflow permissions
+ return AdvancedSecurityWorkflowManager.get_permissions(
+ instance.uuid,
+ instance=instance,
+ permissions=perms_payload,
+ created=kwargs.get("created"),
+ approval_status_changed=kwargs.get("approval_status_changed"),
+ group_status_changed=kwargs.get("group_status_changed"),
+ )
diff --git a/geonode/security/models.py b/geonode/security/models.py
index 5ee7c7b1100..f8e3b29d09f 100644
--- a/geonode/security/models.py
+++ b/geonode/security/models.py
@@ -48,6 +48,7 @@
)
from .utils import get_users_with_perms, get_user_obj_perms_model, skip_registered_members_common_group
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -185,7 +186,7 @@ def set_default_permissions(self, owner=None, created=False):
if not anonymous_group:
raise Exception("Could not acquire 'anonymous' Group.")
- perm_spec = copy.deepcopy(self.get_all_level_info())
+ perm_spec = copy.deepcopy(permissions_registry.get_perms(instance=self, include_virtual=False))
if "users" not in perm_spec:
perm_spec["users"] = {}
if "groups" not in perm_spec:
@@ -450,7 +451,7 @@ def user_can(self, user, permission):
"""
Checks if a has a given permission to the resource.
"""
- user_perms = self.get_user_perms(user)
+ user_perms = permissions_registry.get_perms(instance=self, user=user)
if permission not in user_perms:
# TODO cater for permissions with syntax base.permission_codename
diff --git a/geonode/security/registry.py b/geonode/security/registry.py
new file mode 100644
index 00000000000..cd924f2ebc8
--- /dev/null
+++ b/geonode/security/registry.py
@@ -0,0 +1,89 @@
+#########################################################################
+#
+# Copyright (C) 2024 OSGeo
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see .
+#
+#########################################################################
+from django.conf import settings
+from django.utils.module_loading import import_string
+from geonode.security.handlers import BasePermissionsHandler
+
+
+class PermissionsHandlerRegistry:
+
+ REGISTRY = []
+
+ def init_registry(self):
+ self._register()
+ self.sanity_checks()
+
+ def add(self, module_path):
+ item = import_string(module_path)()
+ self.__check_item(item)
+ self.REGISTRY.append(item)
+
+ def remove(self, module_path):
+ item = import_string(module_path)()
+ self.__check_item(item)
+ self.REGISTRY.remove(item)
+
+ def reset(self):
+ self.REGISTRY = []
+
+ def _register(self):
+ for module_path in settings.PERMISSIONS_HANDLERS:
+ self.add(module_path)
+
+ def sanity_checks(self):
+ for item in self.REGISTRY:
+ self.__check_item(item)
+
+ def __check_item(self, item):
+ """
+ Ensure that the handler is a subclass of BasePermissionsHandler
+ """
+ if not isinstance(item, BasePermissionsHandler):
+ raise Exception(f"Handler {item} is not a subclass of BasePermissionsHandler")
+
+ def fixup_perms(self, instance, payload, include_virtual=True, *args, **kwargs):
+ for handler in self.REGISTRY:
+ payload = handler.fixup_perms(instance, payload, include_virtual=include_virtual, *args, **kwargs)
+ return payload
+
+ def get_perms(self, instance, user=None, include_virtual=True, *args, **kwargs):
+ """
+ Return the payload with the permissions from the handlers.
+ The permissions payload can be edited by each permissions handler.
+ For example before return the payload, we can virtually remove perms
+ to the resource
+ """
+ if user:
+ payload = {"users": {user: instance.get_user_perms(user)}, "groups": {}}
+ else:
+ payload = instance.get_all_level_info()
+
+ for handler in self.REGISTRY:
+ payload = handler.get_perms(instance, payload, user, include_virtual=include_virtual, *args, **kwargs)
+
+ if user:
+ return payload["users"][user]
+ return payload
+
+ @classmethod
+ def get_registry(cls):
+ return PermissionsHandlerRegistry.REGISTRY
+
+
+permissions_registry = PermissionsHandlerRegistry()
diff --git a/geonode/security/tests.py b/geonode/security/tests.py
index 2c634ecabea..bcd690c0c54 100644
--- a/geonode/security/tests.py
+++ b/geonode/security/tests.py
@@ -47,6 +47,7 @@
from geonode.layers.models import Dataset
from geonode.documents.models import Document
from geonode.compat import ensure_string
+from geonode.security.handlers import BasePermissionsHandler
from geonode.upload.models import ResourceHandlerInfo
from geonode.utils import check_ogc_backend
from geonode.tests.utils import check_dataset
@@ -56,6 +57,7 @@
from geonode.groups.models import Group, GroupMember, GroupProfile
from geonode.layers.populate_datasets_data import create_dataset_data
from geonode.base.auth import create_auth_token, get_or_create_token
+from geonode.security.registry import permissions_registry
from geonode.base.models import Configuration, UserGeoLimit, GroupGeoLimit
from geonode.base.populate_test_data import (
@@ -937,7 +939,7 @@ def test_set_dataset_permissions(self):
# Test that previous permissions for users other than ones specified in
# the perm_spec (and the layers owner) were removed
- current_perms = layer.get_all_level_info()
+ current_perms = permissions_registry.get_perms(instance=layer)
self.assertGreaterEqual(len(current_perms["users"]), 1)
# Test that there are no duplicates on returned permissions
@@ -1897,7 +1899,7 @@ def test_set_compact_permissions(self):
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
@@ -1979,7 +1981,7 @@ def test_permissions_are_set_as_expected_resource_publishing_True(self):
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
@@ -2052,7 +2054,9 @@ def test_permissions_are_set_as_expected_admin_upload_resource_publishing_True(s
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [
+ x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)
+ ]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
@@ -2123,7 +2127,7 @@ def test_permissions_are_set_as_expected_admin_upload_resource_publishing_False(
permissions, expected = item
self.resource.set_permissions(permissions)
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)]
self.assertSetEqual(
set(expected_perms),
set(perms_got),
@@ -2178,7 +2182,7 @@ def test_permissions_on_user_role_promotion_to_manager(self):
sut.refresh_from_db()
self.assertEqual(sut.role, "manager")
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)]
self.assertSetEqual(
set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}"
)
@@ -2210,7 +2214,7 @@ def test_permissions_on_user_role_demote_to_member(self):
self.group_member: ["download_resourcebase", "view_resourcebase"],
}
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)]
self.assertSetEqual(
set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}"
)
@@ -2243,7 +2247,7 @@ def test_permissions_on_user_role_demote_to_member_only_RESOURCE_PUBLISHING_acti
self.group_member: ["download_resourcebase", "view_resourcebase"],
}
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)]
self.assertSetEqual(
set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}"
)
@@ -2299,7 +2303,7 @@ def test_permissions_on_user_role_promote_to_manager_only_RESOURCE_PUBLISHING_ac
],
}
for authorized_subject, expected_perms in expected.items():
- perms_got = [x for x in self.resource.get_self_resource().get_user_perms(authorized_subject)]
+ perms_got = [x for x in permissions_registry.get_perms(instance=self.resource, user=authorized_subject)]
self.assertSetEqual(
set(expected_perms), set(perms_got), msg=f"use case #0 - user: {authorized_subject.username}"
)
@@ -2311,7 +2315,7 @@ def test_if_anonymoys_default_perms_is_false_should_not_assign_perms_to_user_gro
"""
resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member})
- self.assertFalse(self.group_profile.group in resource.get_all_level_info()["groups"].keys())
+ self.assertFalse(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys())
@override_settings(DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION=False)
def test_if_anonymoys_default_download_perms_is_false_should_not_assign_perms_to_user_group(self):
@@ -2320,7 +2324,7 @@ def test_if_anonymoys_default_download_perms_is_false_should_not_assign_perms_to
"""
resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member})
- self.assertFalse(self.group_profile.group in resource.get_all_level_info()["groups"].keys())
+ self.assertFalse(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys())
@override_settings(DEFAULT_ANONYMOUS_DOWNLOAD_PERMISSION=False)
@override_settings(RESOURCE_PUBLISHING=True)
@@ -2331,8 +2335,8 @@ def test_if_anonymoys_default_perms_is_false_should_assign_perms_to_user_group_i
"""
resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member})
- self.assertTrue(self.group_profile.group in resource.get_all_level_info()["groups"].keys())
- group_val = resource.get_all_level_info()["groups"][self.group_profile.group]
+ self.assertTrue(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys())
+ group_val = permissions_registry.get_perms(instance=resource)["groups"][self.group_profile.group]
self.assertSetEqual({"view_resourcebase", "download_resourcebase"}, set(group_val))
@override_settings(DEFAULT_ANONYMOUS_VIEW_PERMISSION=False)
@@ -2347,8 +2351,8 @@ def test_if_anonymoys_default_perms_is_false_should_assign_perms_to_user_group_i
resource = resource_manager.create(str(uuid.uuid4), Dataset, defaults={"owner": self.group_member})
- self.assertTrue(self.group_profile.group in resource.get_all_level_info()["groups"].keys())
- group_val = resource.get_all_level_info()["groups"][self.group_profile.group]
+ self.assertTrue(self.group_profile.group in permissions_registry.get_perms(instance=resource)["groups"].keys())
+ group_val = permissions_registry.get_perms(instance=resource)["groups"][self.group_profile.group]
self.assertSetEqual({"view_resourcebase", "download_resourcebase"}, set(group_val))
@@ -2409,7 +2413,7 @@ def setUp(self):
assign_perm(perm, self.member_with_perms, self.resource.get_self_resource())
# Assert inital assignment of permissions to groups and users
- resource_perm_specs = self.resource.get_all_level_info()
+ resource_perm_specs = permissions_registry.get_perms(instance=self.resource)
self.assertSetEqual(
set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms)
)
@@ -2458,7 +2462,7 @@ def test_owner_is_group_manager(self):
# Admin publishes and approves the resource
response = self.admin_approve_and_publish_resource()
self.assertEqual(response.status_code, 200)
- resource_perm_specs = self.resource.get_all_level_info()
+ resource_perm_specs = permissions_registry.get_perms(instance=self.resource)
# Once a resource has been published, the 'publish_resourcebase' permission should be removed anyway
self.assertSetEqual(
@@ -2469,7 +2473,7 @@ def test_owner_is_group_manager(self):
# Admin un-approves and un-publishes the resource
response = self.admin_unapprove_and_unpublish_resource()
self.assertEqual(response.status_code, 200)
- resource_perm_specs = self.resource.get_all_level_info()
+ resource_perm_specs = permissions_registry.get_perms(instance=self.resource)
self.assertSetEqual(
set(resource_perm_specs["users"][self.author]),
@@ -2479,7 +2483,7 @@ def test_owner_is_group_manager(self):
GroupMember.objects.get(group=self.owner_group, user=self.author).demote()
def assertions_for_approved_or_published_is_true(self):
- resource_perm_specs = self.resource.get_all_level_info()
+ resource_perm_specs = permissions_registry.get_perms(instance=self.resource)
self.assertSetEqual(set(resource_perm_specs["users"][self.author]), set(self.owner_perms))
self.assertSetEqual(
set(resource_perm_specs["users"][self.member_with_perms]), set(self.owner_perms + self.dataset_perms)
@@ -2496,7 +2500,7 @@ def assertions_for_approved_or_published_is_true(self):
self.assertSetEqual(set(resource_perm_specs["groups"][self.resource_group.group]), set(self.safe_perms))
def assertions_for_approved_and_published_is_false(self):
- resource_perm_specs = self.resource.get_all_level_info()
+ resource_perm_specs = permissions_registry.get_perms(instance=self.resource)
self.assertSetEqual(
set(resource_perm_specs["users"][self.author]), set(self.owner_perms + self.edit_perms + self.dataset_perms)
)
@@ -2620,7 +2624,7 @@ def test_anonymous_user_is_stripped_off(self):
assign_perm(perm, get_anonymous_user(), resource)
assign_perm(perm, Group.objects.get(name="anonymous"), resource)
- perm_spec = resource.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=resource)
anonymous_user_perm = perm_spec["users"].get(get_anonymous_user())
self.assertEqual(anonymous_user_perm, None, "Anynmous user wasn't removed")
@@ -2662,3 +2666,43 @@ def test_user_can_publish(self):
# setting back the owner to admin
self.dataset.owner = self.admin
self.dataset.save()
+
+
+class DummyPermissionsHandler(BasePermissionsHandler):
+ @staticmethod
+ def fixup_perms(instance, perms_payload, *args, **kwargs):
+ return {"perms": ["this", "is", "fake"]}
+
+
+@override_settings(PERMISSIONS_HANDLERS=["geonode.security.handlers.AdvancedWorkflowPermissionsHandler"])
+class TestPermissionsRegistry(GeoNodeBaseTestSupport):
+ """
+ Test to verify the permissions registry
+ """
+
+ def tearDown(self):
+ permissions_registry.reset()
+
+ def test_registry_is_correctly_initiated(self):
+ """
+ The permissions registry should initiated correctly
+ """
+ permissions_registry.init_registry()
+ self.assertIsNotNone(permissions_registry.get_registry())
+
+ def test_new_handler_is_registered(self):
+ permissions_registry.add("geonode.security.tests.DummyPermissionsHandler")
+ reg = permissions_registry.get_registry()
+ self.assertTrue("geonode.security.tests.DummyPermissionsHandler" in (str(r) for r in reg))
+
+ def test_should_raise_exception_if_is_not_subclass(self):
+ with self.assertRaises(Exception):
+ permissions_registry.add(int)
+
+ def test_handler_should_handle_the_perms_payload(self):
+ # create resource
+ instance = create_single_dataset("fake_dataset")
+ # adding the dummy at the end, means will win over the other handler
+ permissions_registry.add("geonode.security.tests.DummyPermissionsHandler")
+ perms = permissions_registry.fixup_perms(instance, instance.get_all_level_info())
+ self.assertDictEqual({"perms": ["this", "is", "fake"]}, perms)
diff --git a/geonode/security/utils.py b/geonode/security/utils.py
index f39b0b28dd3..2038a03d06f 100644
--- a/geonode/security/utils.py
+++ b/geonode/security/utils.py
@@ -624,7 +624,9 @@ def get_permissions(
_permissions = copy.deepcopy(permissions)
if _resource:
- perm_spec = _permissions or copy.deepcopy(_resource.get_all_level_info())
+ from geonode.security.registry import permissions_registry
+
+ perm_spec = _permissions or copy.deepcopy(permissions_registry.get_perms(instance=_resource))
# Sanity checks
if isinstance(perm_spec, str):
@@ -688,6 +690,8 @@ def set_group_member_permissions(user, group, role):
If the user is demoted, we assign by default at least the view and the download permission
to the resource
"""
+ from geonode.security.registry import permissions_registry
+
# Fetching all the resources belonging to Group "group"; i.e. assgined to "group" metadata
queryset = get_objects_for_user(
user, ["base.view_resourcebase", "base.change_resourcebase"], any_perm=True
@@ -708,7 +712,7 @@ def set_group_member_permissions(user, group, role):
).filter(owner=user)
_resources = queryset.iterator()
for _r in _resources:
- perm_spec = _r.get_all_level_info()
+ perm_spec = permissions_registry.get_perms(instance=_r)
if "users" not in perm_spec:
perm_spec["users"] = {}
if "groups" not in perm_spec:
diff --git a/geonode/security/views.py b/geonode/security/views.py
index 6cf654d59d8..9c695c4fd9e 100644
--- a/geonode/security/views.py
+++ b/geonode/security/views.py
@@ -35,13 +35,13 @@
from geonode.groups.models import GroupProfile
from geonode.notifications_helper import send_notification
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
def _perms_info(obj):
- info = obj.get_all_level_info()
- return info
+ return permissions_registry.get_perms(instance=obj)
def _perms_info_json(obj):
diff --git a/geonode/services/views.py b/geonode/services/views.py
index c5c676721d5..3341fb7fcc1 100644
--- a/geonode/services/views.py
+++ b/geonode/services/views.py
@@ -39,6 +39,7 @@
from .models import Service
from . import forms, enumerations
from .serviceprocessors import get_service_handler
+from geonode.security.registry import permissions_registry
logger = logging.getLogger(__name__)
@@ -139,7 +140,7 @@ def harvest_resources_handle_get(request, service, handler):
{"id": "type-filter", "data_key": "type"},
]
- perms_list = service.get_user_perms(request.user)
+ perms_list = permissions_registry.get_perms(instance=service, user=request.user)
result = render(
request,
@@ -249,7 +250,7 @@ def service_detail(request, service_id):
permissions_json = _perms_info_json(service)
- perms_list = service.get_user_perms(request.user)
+ perms_list = permissions_registry.get_perms(instance=service, user=request.user)
harvested_resources_ids = []
if service.harvester:
diff --git a/geonode/settings.py b/geonode/settings.py
index 17532826c7d..e8c90bcd514 100644
--- a/geonode/settings.py
+++ b/geonode/settings.py
@@ -2328,3 +2328,6 @@ def get_geonode_catalogue_service():
]
INSTALLED_APPS += ("geonode.assets",)
GEONODE_APPS += ("geonode.assets",)
+
+
+PERMISSIONS_HANDLERS = ["geonode.security.handlers.AdvancedWorkflowPermissionsHandler"]
diff --git a/geonode/upload/tests/end2end/integration.py b/geonode/upload/tests/end2end/integration.py
index f1fb6276729..e5fc6ec7cc2 100644
--- a/geonode/upload/tests/end2end/integration.py
+++ b/geonode/upload/tests/end2end/integration.py
@@ -41,6 +41,7 @@
from geonode.tests.utils import upload_step, Client
from geonode.geoserver.helpers import ogc_server_settings, cascading_delete
from geonode.geoserver.signals import gs_catalog
+from geonode.security.registry import permissions_registry
from geoserver.catalog import Catalog
from gisdata import BAD_DATA
@@ -695,7 +696,7 @@ def get_wms_timepositions():
resp, data = self.client.upload_file(thefile, perms='{"users": {"AnonymousUser": []}, "groups":{}}')
_dataset = Dataset.objects.get(name=dataset_name)
_user = get_user_model().objects.get(username="AnonymousUser")
- self.assertEqual(_dataset.get_user_perms(_user).count(), 0)
+ self.assertEqual(permissions_registry.get_perms(instance=_dataset, user=_user).count(), 0)
# initial state is no positions or info
self.assertTrue(get_wms_timepositions() is None)
diff --git a/geonode/upload/tests/end2end/test_end2end.py b/geonode/upload/tests/end2end/test_end2end.py
index 176259b7199..eeece1c3881 100644
--- a/geonode/upload/tests/end2end/test_end2end.py
+++ b/geonode/upload/tests/end2end/test_end2end.py
@@ -130,7 +130,7 @@ def _assertimport(
# check if the dynamic model is created
if os.getenv("IMPORTER_ENABLE_DYN_MODELS", False):
- _schema_id = ModelSchema.objects.filter(name__icontains=initial_name)
+ _schema_id = ModelSchema.objects.filter(name__icontains=initial_name.lower().replace(" ", "_"))
self.assertTrue(_schema_id.exists())
schema_entity = _schema_id.first()
self.assertTrue(FieldSchema.objects.filter(model_schema=schema_entity).exists())
@@ -141,7 +141,8 @@ def _assertimport(
# check if the geonode resource exists
resource = ResourceBase.objects.filter(
- Q(alternate__icontains=f"geonode:{initial_name}") | Q(alternate__icontains=initial_name)
+ Q(alternate__icontains=f"geonode:{initial_name.lower().replace(' ', '_')}")
+ | Q(alternate__icontains=initial_name.lower().replace(" ", "_"))
)
self.assertTrue(resource.exists())
diff --git a/geonode/upload/tests/integration.py b/geonode/upload/tests/integration.py
index 4d9877f3a7c..4e2c113b483 100644
--- a/geonode/upload/tests/integration.py
+++ b/geonode/upload/tests/integration.py
@@ -42,6 +42,7 @@
from geonode.upload.utils import _ALLOW_TIME_STEP
from geonode.geoserver.helpers import ogc_server_settings, cascading_delete
from geonode.geoserver.signals import gs_catalog
+from geonode.security.registry import permissions_registry
from geoserver.catalog import Catalog
from gisdata import BAD_DATA
@@ -696,7 +697,7 @@ def get_wms_timepositions():
resp, data = self.client.upload_file(thefile, perms='{"users": {"AnonymousUser": []}, "groups":{}}')
_dataset = Dataset.objects.get(name=dataset_name)
_user = get_user_model().objects.get(username="AnonymousUser")
- self.assertEqual(_dataset.get_user_perms(_user).count(), 0)
+ self.assertEqual(permissions_registry.get_perms(instance=_dataset, user=_user).count(), 0)
# initial state is no positions or info
self.assertTrue(get_wms_timepositions() is None)