diff --git a/sros2/test/sros2/commands/security/verbs/test_create_permission.py b/sros2/test/sros2/commands/security/verbs/test_create_permission.py index 597cd2e2..a5ec09bd 100644 --- a/sros2/test/sros2/commands/security/verbs/test_create_permission.py +++ b/sros2/test/sros2/commands/security/verbs/test_create_permission.py @@ -21,6 +21,10 @@ import rclpy from ros2cli import cli from sros2.api import _key, _keystore, _permission +from sros2.policy import get_transport_schema + + +_test_identity = '/talker_listener/talker' # This fixture will run once for the entire module (as opposed to once per test) @@ -30,16 +34,16 @@ def security_context_dir(tmpdir_factory, test_policy_dir) -> pathlib.Path: # First, create the keystore as well as a keypair for the talker assert _keystore.create_keystore(keystore_dir) - assert _key.create_key(keystore_dir, '/talker_listener/talker') + assert _key.create_key(keystore_dir, _test_identity) - security_files_dir = keystore_dir / 'contexts' / 'talker_listener' / 'talker' + security_files_dir = keystore_dir.joinpath(f'contexts{_test_identity}') assert security_files_dir.is_dir() # Now using that keystore, create a permissions file using the sample policy policy_file_path = test_policy_dir / 'sample.policy.xml' assert cli.main( argv=[ - 'security', 'create_permission', str(keystore_dir), '/talker_listener/talker', + 'security', 'create_permission', str(keystore_dir), _test_identity, str(policy_file_path)]) == 0 # Return path to directory containing the identity's files @@ -50,9 +54,13 @@ def test_create_permission(security_context_dir): assert security_context_dir.joinpath('permissions.xml').is_file() assert security_context_dir.joinpath('permissions.p7s').is_file() - # Give the generated permissions XML a smoke test tree = lxml.etree.parse(str(security_context_dir.joinpath('permissions.xml'))) + # Validate the schema + permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd') + permissions_xsd = lxml.etree.XMLSchema(lxml.etree.parse(permissions_xsd_path)) + permissions_xsd.assertValid(tree) + dds = tree.getroot() assert dds.tag == 'dds' @@ -61,7 +69,7 @@ def test_create_permission(security_context_dir): grants = list(permissions[0].iterchildren(tag='grant')) assert len(grants) == 1 - assert grants[0].get('name') == '/talker_listener/talker' + assert grants[0].get('name') == _test_identity allow_rules = list(grants[0].iterchildren(tag='allow_rule')) if rclpy.get_rmw_implementation_identifier() in _permission._RMW_WITH_ROS_GRAPH_INFO_TOPIC: @@ -78,12 +86,12 @@ def test_create_permission(security_context_dir): published_topics_set = list(publish_rules[0].iterchildren(tag='topics')) assert len(published_topics_set) == 1 published_topics = [c.text for c in published_topics_set[0].iterchildren(tag='topic')] - assert len(published_topics) == 15 + assert len(published_topics) > 0 subscribed_topics_set = list(subscribe_rules[0].iterchildren(tag='topics')) assert len(subscribed_topics_set) == 1 subscribed_topics = [c.text for c in subscribed_topics_set[0].iterchildren(tag='topic')] - assert len(subscribed_topics) == 14 + assert len(subscribed_topics) > 0 # Verify that publication is allowed on chatter, but not subscription assert 'rt/chatter' in published_topics