Skip to content

Commit

Permalink
Refactor test and validate permissions
Browse files Browse the repository at this point in the history
Signed-off-by: Kyle Fazzari <[email protected]>
  • Loading branch information
kyrofa committed Apr 8, 2020
1 parent cd0ee0d commit e7156ce
Showing 1 changed file with 15 additions and 7 deletions.
22 changes: 15 additions & 7 deletions sros2/test/sros2/commands/security/verbs/test_create_permission.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@
import rclpy
from ros2cli import cli
from sros2.api import _key, _keystore, _permission
from sros2.policy import get_transport_schema


_test_identity = '/talker_listener/talker'


# This fixture will run once for the entire module (as opposed to once per test)
Expand All @@ -30,16 +34,16 @@ def security_context_dir(tmpdir_factory, test_policy_dir) -> pathlib.Path:

# First, create the keystore as well as a keypair for the talker
assert _keystore.create_keystore(keystore_dir)
assert _key.create_key(keystore_dir, '/talker_listener/talker')
assert _key.create_key(keystore_dir, _test_identity)

security_files_dir = keystore_dir / 'contexts' / 'talker_listener' / 'talker'
security_files_dir = keystore_dir.joinpath(f'contexts{_test_identity}')
assert security_files_dir.is_dir()

# Now using that keystore, create a permissions file using the sample policy
policy_file_path = test_policy_dir / 'sample.policy.xml'
assert cli.main(
argv=[
'security', 'create_permission', str(keystore_dir), '/talker_listener/talker',
'security', 'create_permission', str(keystore_dir), _test_identity,
str(policy_file_path)]) == 0

# Return path to directory containing the identity's files
Expand All @@ -50,9 +54,13 @@ def test_create_permission(security_context_dir):
assert security_context_dir.joinpath('permissions.xml').is_file()
assert security_context_dir.joinpath('permissions.p7s').is_file()

# Give the generated permissions XML a smoke test
tree = lxml.etree.parse(str(security_context_dir.joinpath('permissions.xml')))

# Validate the schema
permissions_xsd_path = get_transport_schema('dds', 'permissions.xsd')
permissions_xsd = lxml.etree.XMLSchema(lxml.etree.parse(permissions_xsd_path))
permissions_xsd.assertValid(tree)

dds = tree.getroot()
assert dds.tag == 'dds'

Expand All @@ -61,7 +69,7 @@ def test_create_permission(security_context_dir):

grants = list(permissions[0].iterchildren(tag='grant'))
assert len(grants) == 1
assert grants[0].get('name') == '/talker_listener/talker'
assert grants[0].get('name') == _test_identity

allow_rules = list(grants[0].iterchildren(tag='allow_rule'))
if rclpy.get_rmw_implementation_identifier() in _permission._RMW_WITH_ROS_GRAPH_INFO_TOPIC:
Expand All @@ -78,12 +86,12 @@ def test_create_permission(security_context_dir):
published_topics_set = list(publish_rules[0].iterchildren(tag='topics'))
assert len(published_topics_set) == 1
published_topics = [c.text for c in published_topics_set[0].iterchildren(tag='topic')]
assert len(published_topics) == 15
assert len(published_topics) > 0

subscribed_topics_set = list(subscribe_rules[0].iterchildren(tag='topics'))
assert len(subscribed_topics_set) == 1
subscribed_topics = [c.text for c in subscribed_topics_set[0].iterchildren(tag='topic')]
assert len(subscribed_topics) == 14
assert len(subscribed_topics) > 0

# Verify that publication is allowed on chatter, but not subscription
assert 'rt/chatter' in published_topics
Expand Down

0 comments on commit e7156ce

Please sign in to comment.