diff --git a/rosidl_generator_tests/test/rosidl_generator_type_description/test_type_hash.py b/rosidl_generator_tests/test/rosidl_generator_type_description/test_type_hash.py
index 1785d11b9..d45671882 100644
--- a/rosidl_generator_tests/test/rosidl_generator_type_description/test_type_hash.py
+++ b/rosidl_generator_tests/test/rosidl_generator_type_description/test_type_hash.py
@@ -20,7 +20,7 @@
import fastjsonschema
-def test_type_hash():
+def test_type_hash() -> None:
"""Test all rosidl_generator_type_description output files against defined schemas."""
schema_path = (
Path(get_package_share_directory('rosidl_generator_type_description')) / 'resource' /
diff --git a/rosidl_generator_type_description/CMakeLists.txt b/rosidl_generator_type_description/CMakeLists.txt
index 2a3949a92..a8d9ccbd3 100644
--- a/rosidl_generator_type_description/CMakeLists.txt
+++ b/rosidl_generator_type_description/CMakeLists.txt
@@ -11,6 +11,7 @@ ament_python_install_package(${PROJECT_NAME})
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
find_package(ament_cmake_pytest REQUIRED)
+ find_package(ament_cmake_mypy REQUIRED)
ament_lint_auto_find_test_dependencies()
ament_add_pytest_test(pytest_type_hash_generator test)
endif()
diff --git a/rosidl_generator_type_description/package.xml b/rosidl_generator_type_description/package.xml
index b5cf5db84..ce4ad8974 100644
--- a/rosidl_generator_type_description/package.xml
+++ b/rosidl_generator_type_description/package.xml
@@ -23,6 +23,7 @@
ament_lint_auto
ament_lint_common
+ ament_cmake_mypy
rosidl_generator_packages
diff --git a/rosidl_generator_type_description/rosidl_generator_type_description/__init__.py b/rosidl_generator_type_description/rosidl_generator_type_description/__init__.py
index c62df1787..dd6445a94 100644
--- a/rosidl_generator_type_description/rosidl_generator_type_description/__init__.py
+++ b/rosidl_generator_type_description/rosidl_generator_type_description/__init__.py
@@ -18,45 +18,49 @@
from pathlib import Path
import re
import sys
-from typing import List, Tuple
+from typing import Dict, Final, List, Set, Tuple, TYPE_CHECKING, TypedDict, Union
from rosidl_parser import definition
from rosidl_parser.parser import parse_idl_file
+
+if TYPE_CHECKING:
+ from typing_extensions import NotRequired
+
# RIHS: ROS Interface Hashing Standard, per REP-2011
# NOTE: These values and implementations must be updated if
# - type_description_interfaces messsages change, or
# - the hashing algorithm for type descriptions changes
# Both changes require an increment of the RIHS version
-RIHS01_PREFIX = 'RIHS01_'
-RIHS01_HASH_VALUE_SIZE = 32
-RIHS01_PATTERN = re.compile(r'RIHS([0-9a-f]{2})_([0-9a-f]{64})')
+RIHS01_PREFIX: Final = 'RIHS01_'
+RIHS01_HASH_VALUE_SIZE: Final = 32
+RIHS01_PATTERN: Final = re.compile(r'RIHS([0-9a-f]{2})_([0-9a-f]{64})')
# Used by code generators to create variable names
-GET_DESCRIPTION_FUNC = 'get_type_description'
-GET_HASH_FUNC = 'get_type_hash'
-GET_INDIVIDUAL_SOURCE_FUNC = 'get_individual_type_description_source'
-GET_SOURCES_FUNC = 'get_type_description_sources'
+GET_DESCRIPTION_FUNC: Final = 'get_type_description'
+GET_HASH_FUNC: Final = 'get_type_hash'
+GET_INDIVIDUAL_SOURCE_FUNC: Final = 'get_individual_type_description_source'
+GET_SOURCES_FUNC: Final = 'get_type_description_sources'
-def to_type_name(namespaced_type):
+def to_type_name(namespaced_type: definition.NamespacedType) -> str:
return '/'.join(namespaced_type.namespaced_name())
class GenericInterface:
def __init__(
self, namespaced_type: definition.NamespacedType, members: List[definition.Member]
- ):
+ ) -> None:
self.namespaced_type = namespaced_type
self.members = members
-def add_msg(msg: definition.Message, to_dict: dict):
+def add_msg(msg: definition.Message, to_dict: Dict[str, GenericInterface]) -> None:
to_dict[to_type_name(msg.structure.namespaced_type)] = GenericInterface(
msg.structure.namespaced_type, msg.structure.members)
-def add_srv(srv: definition.Service, to_dict: dict):
+def add_srv(srv: definition.Service, to_dict: Dict[str, GenericInterface]) -> None:
service_members = [
definition.Member(srv.request_message.structure.namespaced_type, 'request_message'),
definition.Member(srv.response_message.structure.namespaced_type, 'response_message'),
@@ -69,7 +73,7 @@ def add_srv(srv: definition.Service, to_dict: dict):
add_msg(srv.event_message, to_dict)
-def add_action(action, to_dict):
+def add_action(action: definition.Action, to_dict: Dict[str, GenericInterface]) -> None:
action_members = [
definition.Member(action.goal.structure.namespaced_type, 'goal'),
definition.Member(action.result.structure.namespaced_type, 'result'),
@@ -88,7 +92,7 @@ def add_action(action, to_dict):
add_msg(action.feedback_message, to_dict)
-def generate_type_hash(generator_arguments_file: str) -> List[str]:
+def generate_type_hash(generator_arguments_file: str) -> List[Path]:
with open(generator_arguments_file, 'r') as f:
args = json.load(f)
package_name = args['package_name']
@@ -107,7 +111,7 @@ def generate_type_hash(generator_arguments_file: str) -> List[str]:
include_map[include_package_name] = Path(include_base_path)
# Define all local IndividualTypeDescriptions
- individual_types = {}
+ individual_types: Dict[str, GenericInterface] = {}
for idl_tuple in idl_tuples:
idl_parts = idl_tuple.rsplit(':', 1)
assert len(idl_parts) == 2
@@ -131,7 +135,7 @@ def generate_type_hash(generator_arguments_file: str) -> List[str]:
add_action(el, individual_types)
# Determine needed includes for types from other packages
- pending_includes = set()
+ pending_includes: Set[Path] = set()
for individual_type in individual_types.values():
for member in individual_type.members:
if isinstance(member.type, definition.NamespacedType):
@@ -176,14 +180,14 @@ def generate_type_hash(generator_arguments_file: str) -> List[str]:
serialized_type_lookup[referenced_type['type_name']] = referenced_type
# Create fully-unrolled TypeDescription instances for local full types, and calculate hashes
- full_types = []
+ full_types: List['FullTypeDescriptionDict'] = []
for type_name, individual_type in individual_types.items():
full_type_description = extract_full_type_description(type_name, serialized_type_lookup)
full_types.append(full_type_description)
hash_lookup[type_name] = calculate_type_hash(full_type_description)
# Write JSON output for each full TypeDescription
- generated_files = []
+ generated_files: List[Path] = []
for full_type_description in full_types:
top_type_name = full_type_description['type_description']['type_name']
hashes = [{
@@ -220,7 +224,7 @@ def parse_rihs_string(rihs_str: str) -> Tuple[int, str]:
# This mapping must match the constants defined in type_description_interfaces/msgs/FieldType.msg
# NOTE: Nonexplicit integer types are not defined in FieldType (short, long, long long).
# If a ROS IDL uses these, this generator will throw a KeyError.
-FIELD_VALUE_TYPE_NAMES = {
+FIELD_VALUE_TYPE_NAMES: Final = {
None: 'FIELD_TYPE_NOT_SET',
'nested_type': 'FIELD_TYPE_NESTED_TYPE',
'int8': 'FIELD_TYPE_INT8',
@@ -245,14 +249,14 @@ def parse_rihs_string(rihs_str: str) -> Tuple[int, str]:
definition.BoundedWString: 'FIELD_TYPE_BOUNDED_WSTRING',
}
-NESTED_FIELD_TYPE_SUFFIXES = {
+NESTED_FIELD_TYPE_SUFFIXES: Final = {
definition.Array: '_ARRAY',
definition.BoundedSequence: '_BOUNDED_SEQUENCE',
definition.UnboundedSequence: '_UNBOUNDED_SEQUENCE',
}
# Copied directly from FieldType.msg, with simple string manipulation to create a dict
-FIELD_TYPE_NAME_TO_ID = {
+FIELD_TYPE_NAME_TO_ID: Final = {
'FIELD_TYPE_NOT_SET': 0,
# Nested type defined in other .msg/.idl files.
@@ -369,11 +373,17 @@ def parse_rihs_string(rihs_str: str) -> Tuple[int, str]:
'FIELD_TYPE_BOUNDED_WSTRING_UNBOUNDED_SEQUENCE': 166,
}
-FIELD_TYPE_ID_TO_NAME = {
+FIELD_TYPE_ID_TO_NAME: Final = {
val: key for key, val in FIELD_TYPE_NAME_TO_ID.items()
}
+FIELD_VALUE_STRING_TYPES: Final = (definition.UnboundedString,
+ definition.BoundedString,
+ definition.BoundedWString,
+ definition.UnboundedWString)
+
+
def field_type_type_name(ftype: definition.AbstractType) -> str:
value_type = ftype
name_suffix = ''
@@ -384,7 +394,7 @@ def field_type_type_name(ftype: definition.AbstractType) -> str:
if isinstance(value_type, definition.BasicType):
value_type_name = FIELD_VALUE_TYPE_NAMES[value_type.typename]
- elif isinstance(value_type, definition.AbstractGenericString):
+ elif isinstance(value_type, FIELD_VALUE_STRING_TYPES):
value_type_name = FIELD_VALUE_TYPE_NAMES[type(value_type)]
elif (
isinstance(value_type, definition.NamespacedType) or
@@ -397,35 +407,31 @@ def field_type_type_name(ftype: definition.AbstractType) -> str:
return value_type_name + name_suffix
-def field_type_type_id(ftype: definition.AbstractType) -> Tuple[str, int]:
+def field_type_type_id(ftype: definition.AbstractType) -> int:
return FIELD_TYPE_NAME_TO_ID[field_type_type_name(ftype)]
def field_type_capacity(ftype: definition.AbstractType) -> int:
- if isinstance(ftype, definition.AbstractNestedType):
- if ftype.has_maximum_size():
- try:
- return ftype.maximum_size
- except AttributeError:
- return ftype.size
+ if isinstance(ftype, definition.Array):
+ return ftype.size
+ elif isinstance(ftype, definition.BoundedSequence):
+ return ftype.maximum_size
+
return 0
-def field_type_string_capacity(ftype: definition.AbstractType) -> int:
+def field_type_string_capacity(ftype: definition.AbstractType) -> Union[int, str]:
value_type = ftype
if isinstance(ftype, definition.AbstractNestedType):
value_type = ftype.value_type
- if isinstance(value_type, definition.AbstractGenericString):
- if value_type.has_maximum_size():
- try:
- return value_type.maximum_size
- except AttributeError:
- return value_type.size
+ if isinstance(value_type, (definition.BoundedString, definition.BoundedWString)):
+ return value_type.maximum_size
+
return 0
-def field_type_nested_type_name(ftype: definition.AbstractType, joiner='/') -> str:
+def field_type_nested_type_name(ftype: definition.AbstractType, joiner: str = '/') -> str:
value_type = ftype
if isinstance(ftype, definition.AbstractNestedType):
value_type = ftype.value_type
@@ -436,7 +442,14 @@ def field_type_nested_type_name(ftype: definition.AbstractType, joiner='/') -> s
return ''
-def serialize_field_type(ftype: definition.AbstractType) -> dict:
+class SerializeFieldTypeDict(TypedDict):
+ type_id: int
+ capacity: int
+ string_capacity: Union[int, str]
+ nested_type_name: str
+
+
+def serialize_field_type(ftype: definition.AbstractType) -> SerializeFieldTypeDict:
return {
'type_id': field_type_type_id(ftype),
'capacity': field_type_capacity(ftype),
@@ -445,26 +458,40 @@ def serialize_field_type(ftype: definition.AbstractType) -> dict:
}
-def serialize_field(member: definition.Member) -> dict:
+class SerializeFieldDict(TypedDict):
+ name: str
+ type: SerializeFieldTypeDict # noqa: A003
+ default_value: 'NotRequired[str]'
+
+
+def serialize_field(member: definition.Member) -> SerializeFieldDict:
+ default_value = ''
+ if member.has_annotation('default'):
+ annotation_value = member.get_annotation_value('default')
+ if isinstance(annotation_value, dict):
+ default_value = str(annotation_value['value'])
+
return {
'name': member.name,
'type': serialize_field_type(member.type),
- 'default_value':
- str(member.get_annotation_value('default')['value'])
- if member.has_annotation('default') else ''
- }
+ 'default_value': default_value}
+
+
+class SerializeIndividualTypeDescriptionDict(TypedDict):
+ type_name: str
+ fields: List[SerializeFieldDict]
def serialize_individual_type_description(
namespaced_type: definition.NamespacedType, members: List[definition.Member]
-) -> dict:
+) -> SerializeIndividualTypeDescriptionDict:
return {
'type_name': to_type_name(namespaced_type),
'fields': [serialize_field(member) for member in members]
}
-def calculate_type_hash(serialized_type_description):
+def calculate_type_hash(serialized_type_description: 'FullTypeDescriptionDict') -> str:
# Create a copy of the description, removing all default values
hashable_dict = deepcopy(serialized_type_description)
for field in hashable_dict['type_description']['fields']:
@@ -490,10 +517,18 @@ def calculate_type_hash(serialized_type_description):
return type_hash
-def extract_full_type_description(output_type_name, type_map):
+class FullTypeDescriptionDict(TypedDict):
+ type_description: SerializeIndividualTypeDescriptionDict
+ referenced_type_descriptions: List[SerializeIndividualTypeDescriptionDict]
+
+
+def extract_full_type_description(
+ output_type_name: str,
+ type_map: Dict[str, SerializeIndividualTypeDescriptionDict]
+) -> FullTypeDescriptionDict:
# Traverse reference graph to narrow down the references for the output type
output_type = type_map[output_type_name]
- output_references = set()
+ output_references: Set[str] = set()
process_queue = [
field['type']['nested_type_name']
for field in output_type['fields']
@@ -517,7 +552,10 @@ def extract_full_type_description(output_type_name, type_map):
}
-def extract_subinterface(type_description_msg: dict, field_name: str):
+def extract_subinterface(
+ type_description_msg: FullTypeDescriptionDict,
+ field_name: str
+) -> FullTypeDescriptionDict:
"""
Filter full TypeDescription to produce a TypeDescription for one of its fields' types.
diff --git a/rosidl_generator_type_description/rosidl_generator_type_description/py.typed b/rosidl_generator_type_description/rosidl_generator_type_description/py.typed
new file mode 100644
index 000000000..e69de29bb
diff --git a/rosidl_generator_type_description/test/test_serializers.py b/rosidl_generator_type_description/test/test_serializers.py
index 3a87b4296..6dd8c10ff 100644
--- a/rosidl_generator_type_description/test/test_serializers.py
+++ b/rosidl_generator_type_description/test/test_serializers.py
@@ -18,7 +18,7 @@
from rosidl_parser import definition
-def test_field_type_serializer():
+def test_field_type_serializer() -> None:
# Sanity check for the more complex capacity/string_capacity types and nesting
string_limit = 12
array_size = 22
@@ -34,28 +34,28 @@ def test_field_type_serializer():
assert result == expected
bounded_sequence_limit = 32
- test_type = definition.BoundedSequence(definition.UnboundedString(), bounded_sequence_limit)
+ test_type2 = definition.BoundedSequence(definition.UnboundedString(), bounded_sequence_limit)
expected = {
'type_id': 113,
'capacity': bounded_sequence_limit,
'string_capacity': 0,
'nested_type_name': '',
}
- result = serialize_field_type(test_type)
+ result = serialize_field_type(test_type2)
assert result == expected
- test_type = definition.BoundedWString(string_limit)
+ test_type3 = definition.BoundedWString(string_limit)
expected = {
'type_id': 22,
'capacity': 0,
'string_capacity': string_limit,
'nested_type_name': '',
}
- result = serialize_field_type(test_type)
+ result = serialize_field_type(test_type3)
assert result == expected
-def test_nested_type_serializer():
+def test_nested_type_serializer() -> None:
namespaced_type = definition.NamespacedType(['my_pkg', 'msg'], 'TestThing')
referenced_type = definition.NamespacedType(['other_pkg', 'msg'], 'RefThing')
nested_referenced_type = definition.UnboundedSequence(referenced_type)