diff --git a/.vscode/launch.json b/.vscode/launch.json new file mode 100644 index 0000000..494fac0 --- /dev/null +++ b/.vscode/launch.json @@ -0,0 +1,40 @@ +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": false + }, + { + "name": "Python: Current File", + "type": "python", + "request": "launch", + "program": "${file}", + "console": "integratedTerminal", + "justMyCode": false, + "args": [ + "--package", + "usb_cam", + "--name", + "--all", + "--node", + "--model-path", + "/home/divya/ros2_ws/runner_op", + "--ws", + "/home/divya/ros2_ws", + "--repo", + "https://github.com/ros-drivers/usb_cam.git -b ros2", + "--path-to-src", + //"~/ros2_ws/src/ros_tutorials/turtlesim" + "~/ros2_ws/src/usb_cam" + ] + } + ] +} \ No newline at end of file diff --git a/haros_runner.py b/haros_runner.py new file mode 100644 index 0000000..8c43c25 --- /dev/null +++ b/haros_runner.py @@ -0,0 +1,44 @@ +#!/usr/bin/env python +import sys +import os +import subprocess +from datetime import datetime +import shutil +import glob + +from rosHumble_model_extractor_scripts.ros2runner import ros2Runner + +# Arguments: +# 1: Package name or Git repository name +# 2: Node name or launch file name or '--all' to analyse all the available nodes +# 3: Type of the request: either 'launch' or 'node' +# 4: Path to the folder where the resulting model files should be stored +# 5: Path to the ROS workspace +# 6: Http address links of the Git repositories (to indicate the branch put the name of the repository between quotes and add the suffix -b *banch_name*, for example "https://github.com/ipa320/ros-model-extractors b main" + +def main(): + + CLI_args = sys.argv[1:] + + if len(CLI_args) < 6: + print(f'ERROR: At least 6 arguments expected, only {len(CLI_args)} are given.') + sys.exit() + + pkgName = sys.argv[1] + NodeName = sys.argv[2] + typeOfRequest = sys.argv[3] + pathToOutput = sys.argv[4] + pathToROSws = sys.argv[5] + gitRepo = sys.argv[6] + + ros2Extractor = ros2Runner(inputPkgName=pkgName, + inputNodeName=NodeName, + typeOfRequest=typeOfRequest, + pathToROSws=pathToROSws, + gitRepo=gitRepo, + outputDir=pathToOutput) + ros2Extractor.extractorRun(True) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/haros_runner.sh b/haros_runner.sh index e313dac..e9025fb 100755 --- a/haros_runner.sh +++ b/haros_runner.sh @@ -86,9 +86,9 @@ then then if [ "${2}" = "--all" ] then - python3 /ros_model_extractor.py --package "$1" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo -a >> extractor.log + python3 /ros_model_extractor.py --package "$1" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo -a >> ${4}/extractor.log else - python3 /ros_model_extractor.py --package "$1" --name "$2" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo>> extractor.log + python3 /ros_model_extractor.py --package "$1" --name "$2" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo>> ${4}/extractor.log fi #cat extractor.log else @@ -102,7 +102,7 @@ fi echo "~~~~~~~~~~~" echo "Extraction finished. See the following report:" -cat extractor.log +cat ${4}/extractor.log echo "~~~~~~~~~~~" echo "###########" @@ -118,4 +118,5 @@ echo "###########" done ## Clean and finish -rm -rf ${5}/src/* +# rm -rf ${5}/src/* +# rm -rf ${4}/* diff --git a/rosHumble_model_extractor_scripts/Ros_Extractor.py b/rosHumble_model_extractor_scripts/Ros_Extractor.py new file mode 100644 index 0000000..3519be5 --- /dev/null +++ b/rosHumble_model_extractor_scripts/Ros_Extractor.py @@ -0,0 +1,435 @@ +#!/usr/bin/env python +# +# Copyright 2019 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import argparse +import subprocess +from ros_model_generator.rosmodel_generator import RosModelGenerator +import ros_metamodels.ros_metamodel_core as RosModelMetamodel + +import rospkg +from copy import deepcopy +#import ament_index_python +from haros.extractor import NodeExtractor, RoscppExtractor, RospyExtractor +from haros.metamodel import Node, Package, RosName, SourceFile +from haros.launch_parser import LaunchParser, LaunchParserError, NodeTag +from haros.cmake_parser import RosCMakeParser +from bonsai.analysis import CodeQuery, resolve_expression + +try: + from bonsai.cpp.clang_parser import CppAstParser +except ImportError: + CppAstParser = None +from bonsai.py.py_parser import PyAstParser + +class RosExtractor(): + def launch(self): + self.parse_arg() + ws = self.args.worspace_path + + #BONSAI PARSER + parser = CppAstParser(workspace = ws) + parser.set_library_path("/usr/lib/llvm-14/lib") + parser.set_standard_includes("/usr/lib/llvm-14/lib/clang/14.0.0/include") + + if (self.args.node): + self.extract_node(self.args.name, self.args.name, self.args.package_name, None, ws, None) + def extract_node(self, name, node_name, pkg_name, ns, ws, rossystem): + self.pkg = Package(pkg_name) + if os.environ.get("ROS_VERSION") == "1": + rospack = rospkg.RosPack() + self.pkg.path = rospack.get_path(pkg_name) + self.pkg_type="CatkinPackage" + elif os.environ.get("ROS_VERSION") == "2": + self.pkg.path= self.args.path_to_src + self.pkg_type="AmentPackage" + roscomponent = None + + #HAROS NODE EXTRACTOR + + srcdir = self.pkg.path[len(ws):] + srcdir = os.path.join(ws, srcdir.split(os.sep, 1)[0]) + bindir = os.path.join(ws, "build") + #HAROS CMAKE PARSER + parser = RosCMakeParser(srcdir, bindir, pkgs = [self.pkg]) + model_str = "" + if os.path.isfile(os.path.join(self.pkg.path, "CMakeLists.txt")): + parser.parse(os.path.join(self.pkg.path, "CMakeLists.txt")) + node_name_dict = deepcopy(parser.executables) + node_name_dict.update(deepcopy(parser.libraries)) + for target in node_name_dict.values(): + print("INFO: Found artifact: "+target.output_name) + if (self.args.a): + node_name = target.output_name + node = Node(node_name, self.pkg, rosname=RosName(node_name)) + for file_in in target.files: + full_path = file_in + relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] + file_name = full_path.rsplit('/', 1)[-1] + source_file = SourceFile(file_name, relative_path , self.pkg) + node.source_files.append(source_file) + else: + if target.output_name == node_name: + node = Node(node_name, self.pkg, rosname=RosName(node_name)) + for file_in in target.files: + full_path = file_in + relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] + file_name = full_path.rsplit('/', 1)[-1] + source_file = SourceFile(file_name, relative_path , self.pkg) + node.source_files.append(source_file) + else: + continue + if node.language == "cpp": + parser = CppAstParser(workspace = ws) + analysis = RoscppExtractor(self.pkg, ws) + + db_dir = os.path.join(ws, "build") + if os.path.isfile(os.path.join(db_dir, "compile_commands.json")): + parser.set_database(db_dir) + else: + print("The compile_commands.json file can't be found") + if node.language == "python": + parser = PyAstParser(workspace = ws) + analysis = RospyExtractor(self.pkg, ws) + #node.source_tree = parser.global_scope + for sf in node.source_files: + try: + if parser.parse(sf.path) is not None: + # ROS MODEL EXTRACT PRIMITIVES + if node.language == "python": + node_name=node_name.replace(".py","") + RosModel_node=RosModelMetamodel.Node(node_name) + try: + self.extract_primitives(node, parser, analysis, RosModel_node, roscomponent, pkg_name, node_name, node_name) + # SAVE ROS MODEL + ros_model = RosModelGenerator() + ros_model.create_model_from_node(self.pkg.name,node_name, RosModel_node, self.args.repo, self.pkg_type) + print("Save model in:") + print(self.args.model_path+"/"+node_name+".ros") + model_str = ros_model.generate_ros_model(self.args.model_path+"/"+node_name+".ros") + except error: + print("The interfaces can't be extracted "+error) + else: + print("The model couldn't be extracted") + except: + pass + if rossystem is not None and roscomponent is not None: + rossystem.add_component(roscomponent) + if self.args.output: + print(model_str) + + def transform_type(self, param_type): + if os.environ.get("ROS_VERSION") == "2": + param_type=str(param_type) + param_type=param_type[param_type.find("[")+1:param_type.find("]")] + if param_type == 'double': + return 'Double' + elif param_type == 'bool': + return 'Boolean' + elif param_type == 'int' or param_type == 'long': + return 'Integer' + elif (param_type == 'str' or 'basic_string' in param_type or param_type == 'std::string'): + return 'String' + #elif param_type == 'yaml': + #return 'Struct' + #elif 'std::vector' in param_type: + #return 'List' + else: + return None + + + def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent, pkg_name, node_name, art_name): + gs = parser.global_scope + node.source_tree = parser.global_scope + if os.environ.get("ROS_VERSION") == "1": + if node.language == "cpp": + #print(CodeQuery(gs).all_calls.get()) + for call in (CodeQuery(gs).all_calls.where_name("SimpleActionServer").get()): + if len(call.arguments) > 0: + name = analysis._extract_action(call) + action_type = analysis._extract_action_type(call).split("_<",1)[0] + RosModel_node.add_action_server(name,action_type.replace("/",".")) + #roscomponent.add_interface(name,"actsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) + for call in (CodeQuery(gs).all_calls.where_name("SimpleActionClient").get()): + if len(call.arguments) > 0: + name = analysis._extract_action(call) + action_type = analysis._extract_action_type(call).split("_<",1)[0] + RosModel_node.add_action_client.append(name,action_type.replace("/",".")) + #roscomponent.add_interface(name,"actcls", str(pkg_name)+"."+str(art_name)+"."+str(node_name)+"."+str(name)) + for call in (CodeQuery(gs).all_calls.where_name("advertise").where_result("ros::Publisher").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + RosModel_node.add_publisher(name, msg_type.replace("/",".")) + #roscomponent.add_interface(name,"pubs", pkg_name+"."+art_name+"."+node_name+"."+name) + for call in (CodeQuery(gs).all_calls.where_name("subscribe").where_result("ros::Subscriber").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + RosModel_node.add_subscriber(name, msg_type.replace("/",".")) + #roscomponent.add_interface(name,"subs", pkg_name+"."+art_name+"."+node_name+"."+name) + for call in (CodeQuery(gs).all_calls.where_name("advertiseService").where_result("ros::ServiceServer").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call) + srv_type = analysis._extract_message_type(call) + RosModel_node.add_service_server(name, srv_type.replace("/",".").replace("Request","")) + #roscomponent.add_interface(name,"srvsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) + for call in (CodeQuery(gs).all_calls.where_name("serviceClient").where_result("ros::ServiceClient").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call) + srv_type = analysis._extract_message_type(call) + RosModel_node.add_service_client(name, srv_type.replace("/",".").replace("Response","")) + #roscomponent.add_interface(name,"srvcls", pkg_name+"."+art_name+"."+node_name+"."+name) + + #PARAMETERS nhg:this needs review + nh_prefix = "c:@N@ros@S@NodeHandle@" + gets = ("getParam", "getParamCached", "param") + reads = gets + ("hasParam", "searchParam") + sets = ("setParam",) + writes = sets + ("deleteParam",) + for call in CodeQuery(gs).all_calls.where_name(reads).get(): + if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): + param_type = default_value = None + param_name = analysis._extract_topic(call) + if call.name in gets: + param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) + if call.name == "param": + if len(call.arguments) > 2: + default_value = analysis._extract_param_value( call, arg_pos=2) + elif len(call.arguments) == 2: + default_value = analysis._extract_param_value( call, arg_pos=1) + if not ((default_value is None or default_value == "") and param_type is None): + RosModel_node.add_parameter(param_name, default_value, param_type, None) + + for call in CodeQuery(gs).all_calls.where_name(writes).get(): + if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): + param_type = value = None + param_name = analysis._extract_topic(call) + if len(call.arguments) >= 2 and call.name in sets: + param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) + value = analysis._extract_param_value(call, arg_pos=1) + if not ((default_value is None or default_value == "") and param_type is None): + RosModel_node.add_parameter(param_name, default_value, param_type, None) + ros_prefix = "c:@N@ros@N@param@" + gets = ("get", "getCached", "param") + reads = gets + ("has",) + sets = ("set",) + writes = sets + ("del",) + for call in CodeQuery(gs).all_calls.where_name(reads).get(): + if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): + param_type = default_value = None + param_name = analysis._extract_topic(call) + if call.name == "param": + if call.name in gets: + param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) + if len(call.arguments) > 2: + default_value = analysis._extract_param_value(call, arg_pos=2) + elif len(call.arguments) == 2: + default_value = analysis._extract_param_value(call, arg_pos=1) + if not ((default_value is None or default_value == "") and param_type is None): + RosModel_node.add_parameter(param_name, default_value, param_type, None) + for call in CodeQuery(gs).all_calls.where_name(writes).get(): + if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): + param_type = value = None + if len(call.arguments) >= 2 and call.name in sets: + param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) + value = analysis._extract_param_value(call, arg_pos=1) + param_name = analysis._extract_topic(call) + if not ((default_value is None or default_value == "") and param_type is None): + RosModel_node.add_parameter(param_name, default_value, param_type, None) + + #PYTHON nhg:this needs review + if node.language == "python": + msgs_list = [] + pkgs_list = [] + for imp_name in parser.imported_names_list: + s = str(imp_name) + if "msg" in s or "srv" in s: + ss = s.split(".") + if len(ss) < 2: + continue + if ss[-1] == "msg" or ss[-1] == "srv": + pkgs_list.append(ss[0]) + elif ss[1] == "msg" or ss[1] == "srv": + msgs_list.append((ss[0], ss[2])) + else: + log.debug(("Python import with 'msg' or 'srv', " + "but unable to process it: ") + + s) + for call in CodeQuery(gs).all_calls.get(): + if "rospy.Publisher" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) + #queue_size = analysis._extract_queue_size(call ) + RosModel_node.add_publisher(name, msg_type.replace("/",".")) + #roscomponent.add_interface(name,"pubs", pkg_name+"."+art_name+"."+node_name+"."+name) + if "rospy.Subscriber" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) + #queue_size = analysis._extract_queue_size(call ) + RosModel_node.add_subscriber(name, msg_type.replace("/",".")) + #roscomponent.add_interface(name,"subs", pkg_name+"."+art_name+"."+node_name+"."+name) + if "rospy.Service" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call, '', msgs_list) + RosModel_node.add_service_server(name, srv_type.replace("/",".").replace("Request","")) + #roscomponent.add_interface(name,"srvsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) + if "rospy.ServiceProxy" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call, '', msgs_list) + RosModel_node.add_service_client(name, srv_type.replace("/",".").replace("Response","")) + #roscomponent.add_interface(name,"srvcls", pkg_name+"."+art_name+"."+node_name+"."+name) + #if "rospy.get_param" in str(call): + # print("PARAM GET:") + # print(call) + if "rospy.set_param" in str(call): + param_name = analysis._extract_topic(call, topic_pos=0) + param_type = default_value = None + default_value = resolve_expression(call.arguments[1]) + RosModel_node.add_parameter(param_name.replace(".","/"), default_value , param_type, None) + + if os.environ.get("ROS_VERSION") == "2": + #ROS2 + if node.language == "cpp": + for call in (CodeQuery(gs).all_calls.get()): + + if "Publisher" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or msg_type!="?": + RosModel_node.add_publisher(name, msg_type.replace("/",".").replace(".msg","")) + + if "Subscriber" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or msg_type!="?": + RosModel_node.add_subscriber(name, msg_type.replace("/",".").replace(".msg","")) + + if "Service" in str(call) or "::srv::" in str(call.function): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + print(name + " " + srv_type) + if name!="?" or srv_type!="?": + RosModel_node.add_service_server(name, srv_type.replace("/",".").replace(".srv","")) + + if "Client" in str(call) and "::srv::" in str(call): + #print(call) + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + print(name + " " + srv_type) + if name!="?" or srv_type!="?": + RosModel_node.add_service_client(name, srv_type.replace("/",".").replace(".srv","")) + if "Client" in str(call) and "::action::" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + act_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or act_type!="?": + RosModel_node.add_action_client(name, act_type.replace("/",".").replace(".action","")) + if "Server" in str(call) and "::action::" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + act_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or act_type!="?": + RosModel_node.add_action_server(name, act_type.replace("/",".").replace(".action","")) + #PARAMETERS ROS2 + params=[] + written_params=[] + for call in (CodeQuery(gs).all_calls.get()): + param_prefix = "c:@N@rclcpp@S@Node@F@" + declare_params = ("get_parameter","declare_parameter") + if (call.full_name.startswith("rclcpp::Node") or (isinstance(call.reference, str)) and call.reference.startswith(param_prefix)): + param_type = default_value = None + if(call.name in declare_params) and len(call.arguments) > 1: + param_name = analysis._extract_topic(call, topic_pos=0) + if len(call.arguments) == 2: + param_type= self.transform_type(resolve_expression(call.arguments[1])) + params.append((param_name, param_type)) + elif len(call.arguments) > 2 and not ('[rclcpp::ParameterValue] (default)' in str(resolve_expression(call.arguments[1]))): + default_value = resolve_expression(call.arguments[1]) + param_type = self.transform_type(resolve_expression(call)) + params.append((param_name, param_type, default_value)) + for parameter in params: + param_name_ = param_type_ = default_value_ = None + if len(parameter) > 2: + param_name_, param_type_, default_value_ = parameter + if not ((default_value_ is None or default_value_ == "") and param_type_ is None): + RosModel_node.add_parameter(param_name_.replace(".","/"), default_value_ , param_type_, None) + written_params.append(param_name_) + elif len(parameter) == 2: + param_name_, param_type_ = parameter + if not (param_type_ is None) and not (param_name_ in written_params): + RosModel_node.add_parameter(param_name_.replace(".","/"), default_value_ , param_type_, None) + elif node.language == "python": + msgs_list = [] + pkgs_list = [] + for imp_name in parser.imported_names_list: + s = str(imp_name) + print(imp_name) + if "msg" in s or "srv" in s: + ss = s.split(".") + if len(ss) < 2: + continue + if ss[-1] == "msg" or ss[-1] == "srv": + pkgs_list.append(ss[0]) + elif ss[1] == "msg" or ss[1] == "srv": + msgs_list.append((ss[0], ss[2])) + else: + log.debug(("Python import with 'msg' or 'srv', " + "but unable to process it: ") + + s) + for call in (CodeQuery(gs).all_calls.get()): + print(call) + if "create_subscription" in str(call): + print("Found subcription") + + + + def parse_arg(self): + parser = argparse.ArgumentParser() + mutually_exclusive = parser.add_mutually_exclusive_group() + mutually_exclusive.add_argument('--node', '-n', help="node analyse", action='store_true') + mutually_exclusive.add_argument('--launch', '-l', help="launch analyse", action='store_true') + parser.add_argument('--model-path', help='path to the folder in which the model files should be saved', + default='./', + nargs='?', const='./') + parser.add_argument('--output', help='print the model output') + parser.add_argument('--package', required=True, dest='package_name') + parser.add_argument('--name', required=False, dest='name') + parser.add_argument('--ws', required=True, dest='worspace_path') + parser.add_argument('--path-to-src', required=False, dest='path_to_src') + parser.add_argument('--repo', required=False, dest='repo') + parser.add_argument('-a', action='store_true') + self.args = parser.parse_args() + + diff --git a/rosHumble_model_extractor_scripts/file_handling.py b/rosHumble_model_extractor_scripts/file_handling.py new file mode 100644 index 0000000..5f3f60d --- /dev/null +++ b/rosHumble_model_extractor_scripts/file_handling.py @@ -0,0 +1,33 @@ +#!/usr/bin/env python +import sys +import os +import subprocess +from datetime import datetime +import shutil + +import glob + + +class fileHandling: + def __init__(self): + pass + + @staticmethod + def wrtieToFile(fileNameWithPath, contentToWrite): + with open(fileNameWithPath, "w") as file: + file.write(contentToWrite) + print("-----Write to {} completed-----".format(fileNameWithPath)) + + @staticmethod + def showFileContent(fileNameWithPath): + + print("-----Show the ROS model of {}".format(fileNameWithPath)) + with open(fileNameWithPath,'r') as file: + print(file.read()) + + @staticmethod + def filterFileNamesByExtension(rootDir, extensionToSearch): + + fileList = glob.glob(os.path.join(rootDir, '**', "*.{}".format(extensionToSearch)), recursive=True) + + return fileList \ No newline at end of file diff --git a/rosHumble_model_extractor_scripts/ros2runner.py b/rosHumble_model_extractor_scripts/ros2runner.py new file mode 100644 index 0000000..34b22e7 --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros2runner.py @@ -0,0 +1,119 @@ +#!/usr/bin/env python +import sys +import os +import subprocess +from datetime import datetime +import shutil +from rosHumble_model_extractor_scripts.ros_pkg_runner import rosPkgRunner +from rosHumble_model_extractor_scripts.file_handling import fileHandling +import glob + + + +class ros2Runner(rosPkgRunner): + + ROS2setupBashFile = "install/setup.bash" + ROS2Version = 2 + ROS2PythonVer = 3 + + + def __init__(self, + inputPkgName:str, + inputNodeName:str, + typeOfRequest:str, + pathToROSws:str, + gitRepo:str, + outputDir:str): + + super().__init__(inputPkgName=inputPkgName, + inputNodeName=inputNodeName, + typeOfRequest=typeOfRequest, + pathToROSws=pathToROSws, + gitRepo=gitRepo, + setupBashFile=self.ROS2setupBashFile, + outputDir=outputDir) + + def runColconBuild(self,): + subprocess.run(["colcon", "build", "--cmake-args"]) #CMake Colcon Warning fix + + def runColconList(self,): + os.system('colcon list > /tmp/colcon_list.txt') + + + + def calcPathToPkgSRC(self, textList:str): + + os.system('cat /tmp/colcon_list.txt | grep "^$1" | awk \'{ print $2}\' > /tmp/colcon_list_update.txt') + path_to_src_code=subprocess.run(["cat","/tmp/colcon_list_update.txt"], stdout=subprocess.PIPE, text=True).stdout + path_to_src_code=path_to_src_code.split("\n") + + self.setPathToSRCcode([os.path.join(self.getPathToROSws(), item) for item in path_to_src_code]) + self.validatePathToSRCcodeList() + + def startInstallPkgProcedure(self): + + self.runCommonCmd() + self.runColconBuild() + self.runSetupBash() + colconList = self.runColconList() + self.calcPathToPkgSRC(colconList) + print("## ROS pkg install and Build complete") + + def callHarosPlugin(self,): + + self.harosInit() + + + for pkgName in self.getPathToSRCcode(): + currOutDir = os.path.join(self.outputDir, pkgName.split('/')[-1]) + fileName = os.path.join(os.path.dirname(__file__), self.getROSModelExtractorPluginFile()) + extractorFile = currOutDir+"/extractor.log" + extractorContent = None + + if(not os.path.isdir(currOutDir)): + os.makedirs(currOutDir) + + if self.getInputNodeName() == "--all": + extractorContent = subprocess.run(["python3", fileName, + "--package", str(self.getInputPkgName()), + "--"+str(self.getTypeOfRequest()), + "--model-path",str(currOutDir) , + "--ws", str(self.getPathToROSws()), + "--path-to-src", pkgName, + "--repo", str(self.getGitModelRepo()), + "-a"], stdout=subprocess.PIPE, text=True).stdout + else: + extractorContent = subprocess.run(["python3", fileName, + "--package", str(self.getInputPkgName()), + "--name", str(self.getInputNodeName()), + "--"+str(self.getTypeOfRequest()), + "--model-path",str(currOutDir), + "--ws", str(self.getPathToROSws()), + "--path-to-src", pkgName, + "--repo", str(self.getGitModelRepo()) + ], stdout=subprocess.PIPE, text=True).stdout + fileHandling.wrtieToFile(extractorFile, extractorContent) + + def extractorRun(self,clearFlag=False): + + + #1. clone the git repo at the required + rosPkgRunner.cloneRepo(self.getPathToROSws(), self.getGitRepo()) + + #2. Change working directory to ROS ws + os.chdir(self.getPathToROSws()) + + #3. Install dependencies and build + self.startInstallPkgProcedure() + + #4. Init the plugin + self.callHarosPlugin() + + #5. Show the ros models + self.showAllROSmodel() + + #6. Clear the ROS ws SRC folder + if clearFlag: + self.cleanUPsrc() + + print( "------------------Done---------------") \ No newline at end of file diff --git a/rosHumble_model_extractor_scripts/rosHumble_model_extractor.py b/rosHumble_model_extractor_scripts/rosHumble_model_extractor.py new file mode 100644 index 0000000..6b40980 --- /dev/null +++ b/rosHumble_model_extractor_scripts/rosHumble_model_extractor.py @@ -0,0 +1,58 @@ +#!/usr/bin/env python +# +# Copyright 2019 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from Ros_Extractor import RosExtractor + + +class RosInterface: + def __init__(self, name, ref): + self.name = name + self.ref = ref + +class ros_component: + def __init__(self, name, ns): + self.name = ns+name if ns else name + self.ns = ns + self.pubs = [] + self.subs = [] + self.srvsrvs = [] + self.srvcls = [] + self.actsrvs = [] + self.actcls = [] + def add_interface(self, name, interface_type, ref): + if interface_type == "pubs": + self.pubs.append(RosInterface(name,ref)) + if interface_type == "subs": + self.subs.append(RosInterface(name,ref)) + if interface_type == "srvsrvs": + self.srvsrvs.append(RosInterface(name,ref)) + if interface_type == "srvcls": + self.srvcls.append(RosInterface(name,ref)) + if interface_type == "actsrvs": + self.actsrvs.append(RosInterface(name,ref)) + if interface_type == "actcls": + self.actcls.append(RosInterface(name,ref)) + +def main(argv = None): + extractor = RosExtractor() + if extractor.launch(): + return 0 + return 1 + +if __name__== "__main__": + main() diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/__init__.py b/rosHumble_model_extractor_scripts/ros_metamodels/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/__init__.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..ab21f3a Binary files /dev/null and b/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/__init__.cpython-310.pyc differ diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/ros_metamodel_core.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/ros_metamodel_core.cpython-310.pyc new file mode 100644 index 0000000..1e9ac92 Binary files /dev/null and b/rosHumble_model_extractor_scripts/ros_metamodels/__pycache__/ros_metamodel_core.cpython-310.pyc differ diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/ros_metamodel_core.py b/rosHumble_model_extractor_scripts/ros_metamodels/ros_metamodel_core.py new file mode 100644 index 0000000..21b6f86 --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_metamodels/ros_metamodel_core.py @@ -0,0 +1,324 @@ +#!/usr/bin/env python +# +# Copyright 2021 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +## ROS MODEL METAMODEL ## + +class RosModel(object): + def __init__(self): + self.packages = list() + + def add_package(self, package): + self.packages.append(package) + + def dump_xtext_model(self): + ros_model_str = "PackageSet {\n" + for package in self.packages: + ros_model_str += package.dump_xtext_model() + ",\n" + ros_model_str = ros_model_str[:-2] + ros_model_str += "}" + return ros_model_str + +class Package(object): + def __init__(self, name, repo=None, pkg_type="CatkinPackage"): + self.name = name + self.artifacts=ArtifactSet() + self.repo = GitRepo(repo) + if pkg_type not in ["CatkinPackage","AmentPackage","Package"]: + print("\n ERROR: Invalid package type given, supported types are CatkinPackage(for ROS1), AmentPackage(for ROS2) or Package(for non-ROS packages)\n") + return + else: + self.pkg_type = pkg_type + + def add_artifact(self, artifact): + self.artifacts.add(artifact) + + def add_repo(self, repo): + self.repo = repo + + def dump_xtext_model(self): + ros_model_str = " "+self.pkg_type+" "+self.name+" {\n" + ros_model_str += self.repo.dump_xtext_model() + ros_model_str += self.artifacts.dump_xtext_model() + ros_model_str += "}" + return ros_model_str + +class ArtifactSet(set): + def get_list(self): + return [x.get_dict() for x in self] + + def dump_xtext_model(self): + if len(self) == 0: + return "" + str_ = "" + for elem in self: + str_ += elem.dump_xtext_model() + ",\n" + str_ = str_[:-2] + return str_ + +class Artifact(object): + def __init__(self, name, node): + self.name = name + self.node=node + + def dump_xtext_model(self): + ros_model_str = " Artifact "+self.name+" {\n" + ros_model_str += self.node.dump_xtext_model() + ros_model_str += "}" + return ros_model_str + +class GitRepo(object): + def __init__(self, repo): + self.repo=repo + + def dump_xtext_model(self): + if (self.repo != None): + ros_model_str = ' FromGitRepo "'+self.repo+'" \n' + return ros_model_str + else: + return "" + +class Node(object): + def __init__(self, name): + self.name = name + self.action_clients = InterfaceSet() + self.action_servers = InterfaceSet() + self.publishers = InterfaceSet() + self.subscribers = InterfaceSet() + self.service_clients = InterfaceSet() + self.service_servers = InterfaceSet() + self.params = ParameterSet() + + def add_publisher(self, name, topic_type): + self.publishers.add(Interface(name,topic_type)) + def add_subscriber(self, name, topic_type): + self.subscribers.add(Interface(name, topic_type)) + + def add_service_server(self, name, srv_type): + self.service_servers.add(Interface(name, srv_type)) + def add_service_client(self, name, srv_type): + self.service_clients.add(Interface(name, srv_type)) + + def add_action_client(self, name, act_type): + self.action_clients.add(Interface(name, act_type)) + def add_action_server(self, name, act_type): + self.action_servers.add(Interface(name, act_type)) + + def add_parameter(self, name, value, type, default, set_value=True): + self.params.add(Parameter(name, value, type, default, print_value=set_value)) + + def dump_xtext_model(self): + ros_model_str = " Node { name " + self.name + ros_model_str += self.service_servers.dump_xtext_model( + " ", "ServiceServer", "service", "ServiceServers") + ros_model_str += self.service_clients.dump_xtext_model( + " ", "ServiceClient", "service", "ServiceClients") + ros_model_str += self.publishers.dump_xtext_model( + " ", "Publisher", "message", "Publishers") + ros_model_str += self.subscribers.dump_xtext_model( + " ", "Subscriber", "message", "Subscribers") + ros_model_str += self.action_servers.dump_xtext_model( + " ", "ActionServer", "action", "ActionServers") + ros_model_str += self.action_clients.dump_xtext_model( + " ", "ActionClient", "action", "ActionClients") + ros_model_str += self.params.dump_xtext_model( + " ", "Parameters", "Parameters") + ros_model_str += "}\n" + return ros_model_str + +class Interface(object): + def __init__(self, name, type, namespace=""): + self.fullname = name + self.namespace = namespace + self.name = name[len(self.namespace)-1:] + self.type = type + + def dump_xtext_model(self, indent, name_type, interface_type): + return ("%s%s { name '%s' %s '%s'}") % ( + indent, name_type, self.fullname, interface_type, self.type.replace("/", ".")) + +class Parameter(object): + def __init__(self, name, value=None, type=None, default=None, namespace="", print_value=True): + self.fullname = name + self.namespace = namespace + self.name = name[len(self.namespace)-1:] + self.value = value + self.default = default + self.type = self.get_type(value, default, type) + self.count = 0 + self.print_value = print_value + + def get_type(self, value, default=None, given_type=None): + if given_type != None: + return given_type + elif value!=None: + return self.get_type_from_value(value) + elif default!=None: + return self.get_type_from_value(default) + else: + return '' + + def get_type_from_value(self, value): + param_type = type(value) + param_type = (str(param_type)).replace("<", "").replace("class", "").replace("type", "").replace(" '", "").replace("'>", "") + if param_type == 'float': + return 'Double' + elif param_type == 'bool': + return 'Boolean' + elif param_type == 'int': + return 'Integer' + elif param_type == 'str': + return 'String' + elif param_type == 'list': + if type(value[0]) == dict: + return 'Struc' + else: + return 'List' + elif param_type == 'dict': + return 'Struc' + else: + return param_type + + def set_value(self, value, indent): + str_param_value = "" + if self.type == "String": + str_param_value += "'"+value+"'" + elif self.type == "Boolean": + str_param_value += str(value).lower() + elif self.type == "List": + str_param_value += str(value).replace( + "[", "{").replace("]", "}") + elif self.type == 'Struc': + str_param_value += self.value_struct(value[0], indent+" ") + else: + str_param_value += str(value) + return str_param_value + + def types_struct(self, struct_dict, indent): + str_param = "{\n" + indent_new = indent+" " + for struct_element in struct_dict: + sub_name = struct_element + sub_value = struct_dict[struct_element] + sub_type = self.get_type(sub_value) + str_param += "%s'%s' %s" % (indent_new, sub_name, sub_type) + if sub_type == 'List': + str_param += self.form_list(sub_value) + if isinstance(sub_value, dict): + str_param += self.types_struct( + struct_dict[struct_element], indent_new) + str_param += ",\n" + str_param = str_param[:-2] + str_param += "}" + indent_new = "" + return str_param + + def value_struct(self, struct_dict, indent): + str_param = "{\n" + indent_new = indent+" " + for struct_element in struct_dict: + sub_name = struct_element + sub_value = struct_dict[struct_element] + sub_type = self.get_type(sub_value) + str_param += "%s{ '%s' { value " % (indent_new, sub_name) + if sub_type == "String": + sub_value = "'"+sub_value+"'" + if sub_type == 'List': + sub_value = str(sub_value).replace( + "[", "{").replace("]", "}").replace("{{", "{").replace("}}", "}") + if sub_type == "Boolean": + sub_value = str(sub_value).lower() + if isinstance(sub_value, dict): + str_param += self.value_struct( + struct_dict[struct_element], indent_new) + self.count = self.count + 1 + else: + str_param += "%s}}" % (sub_value) + str_param += ",\n" + str_param = str_param[:-2] + str_param += "}" + if self.count == 1: + str_param += "}}" + self.count = self.count - 1 + indent_new = "" + return str_param + + def form_list(self, value_in): + str_param = "{" + for i in value_in: + str_param += self.get_type(i) + if self.get_type(i) == "List": + str_param += self.form_list(i) + str_param += "," + str_param = str_param[:-1] + str_param += "}" + return str_param + + def get_dict(self): + return {"Value": self.value, "Fullname": self.fullname, + "Namespace": self.namespace, "Name": self.name} + + def dump_xtext_model(self, indent="", value=""): + str_param = "%sParameter { name '%s' type %s " % ( + indent, self.fullname, self.type) + if self.type == 'Struct': + str_param += self.types_struct(self.value[0], indent) + #str_param = str_param[:-2] + if self.type == 'List': + if self.value: + str_param += self.form_list(self.value) + elif self.default: + str_param += self.form_list(self.default) + if self.default and self.print_value: + str_param += ' default ' + self.set_value(self.default, indent) + if self.value and self.print_value: + str_param += 'value ' + self.set_value(self.value, indent) + str_param += "}" + return str_param + +class InterfaceSet(set): + def get_list(self): + return [x.get_dict() for x in self] + + def dump_xtext_model(self, indent="", name_type="", interface_type="", name_block=""): + if len(self) == 0: + return "" + str_ = ("\n%s%s {\n") % (indent, name_block) + for elem in self: + str_ += elem.dump_xtext_model(indent+" ", name_type, interface_type) + ",\n" + str_ = str_[:-2] + str_ += "}" + return str_ + +class ParameterSet(set): + def get_list(self): + return [x.get_dict() for x in self] + + def iteritems(self): + return [(x.fullname, x.type) for x in self] + + def iterkeys(self): + return [x.fullname for x in self] + + def dump_xtext_model(self, indent="", value="", name_block=""): + if len(self) == 0: + return "" + str_ = ("\n%s%s {\n") % (indent, name_block) + for elem in self: + str_ += elem.dump_xtext_model(indent+" ", value) + ",\n" + str_ = str_[:-2] + str_ += "}" + return str_ + diff --git a/rosHumble_model_extractor_scripts/ros_metamodels/rossystem_metamodel_core.py b/rosHumble_model_extractor_scripts/ros_metamodels/rossystem_metamodel_core.py new file mode 100644 index 0000000..41d38ae --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_metamodels/rossystem_metamodel_core.py @@ -0,0 +1,219 @@ +#!/usr/bin/env python +# +# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ros_metamodels.ros_metamodel_core import ParameterSet, Parameter + +## ROS SYSTEM METAMODEL ## +class RosSystem(object): + def __init__(self, name, package): + self.name = name + self.package = package + self.components = ComponentSet() + self.params = ParameterSet() + + def dump_xtext_model(self): + system_model_str = "RosSystem { Name '%s'\n" % self.name + system_model_str += " RosComponents ( \n" + system_model_str += self.components.dump_xtext_model(self.package) + system_model_str = system_model_str[:-2] + system_model_str += "}\n)" + system_model_str += self.params.dump_xtext_model( + " ", "Parameters", "Parameters") + system_model_str += "\n}" + + return system_model_str + + +class Component(object): + def __init__(self, name): + self.name = name + self.action_clients = RosInterfaceSet() + self.action_servers = RosInterfaceSet() + self.publishers = RosInterfaceSet() + self.subscribers = RosInterfaceSet() + self.service_clients = RosInterfaceSet() + self.service_servers = RosInterfaceSet() + self.params = RosParameterSet() + + def dump_xtext_model(self, package=""): + system_model_str = " ComponentInterface { name '" + \ + self.name+"'\n" + system_model_str += self.publishers.dump_xtext_model( + " ", "Publishers", "Publisher", self.name, package) + system_model_str += self.subscribers.dump_xtext_model( + " ", "Subscribers", "Subscriber", self.name, package) + system_model_str += self.service_servers.dump_xtext_model( + " ", "SrvServers", "Server", self.name, package, "ServiceServer") + system_model_str += self.action_servers.dump_xtext_model( + " ", "ActionServers", "Server", self.name, package, "ActionServer") + system_model_str += self.action_clients.dump_xtext_model( + " ", "ActionClients", "Client", self.name, package, "ActionClient") + system_model_str += self.params.dump_xtext_model( + " ", "Parameters", "Parameter", self.name, package) + system_model_str += " },\n" + return system_model_str + +class RosInterface(object): + def __init__(self, name, reference, namespace=""): + self.resolved = name + self.namespace = namespace + self.minimal = name[len(self.namespace)-1:] + self.reference = reference + + def dump_xtext_model(self, indent, name_type, interface_type): + return ("%s%s { name '%s' %s '%s'}") % ( + indent, name_type, self.resolved, interface_type, self.reference.replace("/", ".")) + +class RosParameter(object): + def __init__(self, name, reference, value, namespace=""): + self.resolved = name + self.namespace = namespace + self.minimal = name[len(self.namespace)-1:] + self.value = value + self.reference = reference + self.count = 0 + + def get_type(self, value): + itype = type(value) + itype = (str(itype)).replace("<", "").replace("class", "").replace("type", "").replace(" '", "").replace("'>", "") + if itype == 'float': + return 'Double' + elif itype == 'bool': + return 'Boolean' + elif itype == 'int': + return 'Integer' + elif itype == 'str': + return 'String' + elif itype == 'list': + if type(value[0]) == dict: + return 'Struc' + else: + return 'List' + elif itype == 'dict': + return 'Struc' + else: + return itype + + def set_value(self, value, indent): + itype = self.get_type(value) + str_param_value = "" + if itype == "String": + str_param_value += "'"+self.value+"'" + elif itype == "Boolean": + str_param_value += str(self.value).lower() + elif itype == "List": + str_param_value += str(self.value).replace( + "[", "{").replace("]", "}") + elif itype == 'Struc': + str_param_value += self.value_struct(self.value[0], indent+" ") + else: + str_param_value += str(value) + return str_param_value + + def value_struct(self, struct_dict, indent): + str_param = "{\n" + indent_new = indent+" " + for struct_element in struct_dict: + sub_name = struct_element + sub_value = struct_dict[struct_element] + sub_type = self.get_type(sub_value) + str_param += "%s{ '%s' { value " % (indent_new, sub_name) + if sub_type == "String": + sub_value = "'"+sub_value+"'" + if sub_type == 'List': + sub_value = str(sub_value).replace( + "[", "{").replace("]", "}").replace("{{", "{").replace("}}", "}") + if sub_type == "Boolean": + sub_value = str(sub_value).lower() + if isinstance(sub_value, dict): + str_param += self.value_struct( + struct_dict[struct_element], indent_new) + self.count = self.count + 1 + else: + str_param += "%s}}" % (sub_value) + str_param += ",\n" + str_param = str_param[:-2] + str_param += "}" + if self.count == 1: + str_param += "}}" + self.count = self.count - 1 + indent_new = "" + return str_param + +class ComponentSet(set): + def get_list(self): + return [x.get_dict() for x in self] + + def iteritems(self): + return [(x.resolved, x.itype) for x in self] + + def iterkeys(self): + return [x.resolved for x in self] + + def dump_xtext_model(self, package="", indent="", value="", name_block=""): + if len(self) == 0: + return "" + str_ = ("\n%s%s") % (indent, name_block) + for elem in self: + str_ += elem.dump_xtext_model(package) + "\n" + str_ = str_[:-2] + return str_ + +class RosInterfaceSet(set): + def get_list(self): + return [x.get_dict() for x in self] + + def iteritems(self): + return [(x.resolved, x.reference) for x in self] + + def iterkeys(self): + return [x.resolved for x in self] + + def dump_xtext_model(self, indent="", name_type="", name_type2="", node_name="", pkg_name="", name_type3=""): + if len(self) == 0: + return "" + if not name_type3: + name_type3 = name_type2 + str_ = ("%sRos%s {\n") % (indent, name_type) + for elem in self: + str_ += ("%s Ros%s '%s' {Ref%s '%s.%s.%s.%s'},\n") % ( + indent, name_type3, elem.resolved, name_type2, pkg_name, node_name, node_name, elem.resolved) + str_ = str_[:-2] + str_ += "}\n" + return str_ + +class RosParameterSet(set): + def get_list(self): + return [x.get_dict() for x in self] + + def iteritems(self): + return [(x.resolved, x.reference) for x in self] + + def iterkeys(self): + return [x.resolved for x in self] + + def dump_xtext_model(self, indent="", name_type="", name_type2="", node_name="", pkg_name="", name_type3=""): + if len(self) == 0: + return "" + if not name_type3: + name_type3 = name_type2 + str_ = ("%sRos%s {\n") % (indent, name_type) + for elem in self: + str_ += ("%s Ros%s '%s' {Ref%s '%s.%s.%s.%s' value %s},\n") % ( + indent, name_type3, elem.resolved, name_type2, pkg_name, node_name, node_name, elem.resolved, elem.set_value(elem.value, indent)) + str_ = str_[:-2] + str_ += "}\n" + return str_ diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/__init__.py b/rosHumble_model_extractor_scripts/ros_model_generator/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/__init__.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..0424860 Binary files /dev/null and b/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/__init__.cpython-310.pyc differ diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/rosmodel_generator.cpython-310.pyc b/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/rosmodel_generator.cpython-310.pyc new file mode 100644 index 0000000..02d7277 Binary files /dev/null and b/rosHumble_model_extractor_scripts/ros_model_generator/__pycache__/rosmodel_generator.cpython-310.pyc differ diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/rosmodel_generator.py b/rosHumble_model_extractor_scripts/ros_model_generator/rosmodel_generator.py new file mode 100644 index 0000000..3a00d78 --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_model_generator/rosmodel_generator.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# +# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyparsing import * +import ros_metamodels.ros_metamodel_core as model + +class RosModelGenerator(object): + def __init__(self): + self.ros_model = model.RosModel() + + def create_model_from_node(self, package_name, artifact_name, node, repo=None, pkg_type="CatkinPackage"): + package = model.Package(package_name, repo, pkg_type) + gitrepo=model.GitRepo(repo) + artifact = model.Artifact(artifact_name, node) + package.add_artifact(artifact) + package.add_repo(gitrepo) + self.ros_model.add_package(package) + + def generate_ros_model(self, ros_model_file): + sucess, ros_model_str = self.create_ros_model() + with open(ros_model_file, 'w') as outfile: + outfile.write(ros_model_str) + + def generate_ros_model_list(self, components, ros_model_file): + sucess, ros_model_str = self.create_ros_model_list(components) + with open(ros_model_file, 'w') as outfile: + outfile.write(ros_model_str) + + def generate_ros_model_from_system(self, rossystem, package, ros_model_file, print_param_value=True): + sucess, ros_model_str = self.create_ros_model_from_system(package, rossystem, print_param_value) + with open(ros_model_file, 'w') as outfile: + outfile.write(ros_model_str) + + def create_ros_model(self): + ros_model_str = self.ros_model.dump_xtext_model() + return True, ros_model_str + + def create_ros_model_from_system(self, package_name, rossystem, print_param_value=True): + package = model.Package(package_name) + for component in rossystem.components: + node = model.Node(component.name) + + for param in component.params: + node.add_parameter(param.resolved, None, None, param.value, print_param_value) + + for pub, pub_type in component.publishers.iteritems(): + node.add_publisher(pub, pub_type) + + for sub, sub_type in component.subscribers.iteritems(): + node.add_subscriber(sub, sub_type) + + for serv, serv_type in component.service_servers.iteritems(): + node.add_service_server(serv, serv_type) + + for serv, serv_type in component.service_clients.iteritems(): + node.add_service_client(serv, serv_type) + + for action, action_type in component.action_clients.iteritems(): + node.add_action_client(action, action_type) + + for action, action_type in component.action_servers.iteritems(): + node.add_action_server(action, action_type) + + artifact = model.Artifact(node.name, node) + package.add_artifact(artifact) + + self.ros_model.add_package(package) + return self.create_ros_model() + + def create_ros_model_list(self, components): + for name in components: + if name == 'global_parameters': + continue + node = model.Node(name) + + if 'parameters' in components[name]: + parameters = components[name]['parameters'] + for param_name, param in parameters.items(): + node.add_parameter(param_name, None, None, param[0]) + + if 'publishers' in components[name]: + publishers = components[name]['publishers'] + for pub, pub_type in publishers.items(): + node.add_publisher(pub, pub_type) + + if 'subscribers' in components[name]: + subscribers = components[name]['subscribers'] + for sub, sub_type in subscribers.items(): + node.add_subscriber(sub, sub_type) + + if 'service_servers' in components[name]: + service_servers = components[name]['service_servers'] + for serv, serv_type in service_servers.items(): + node.add_service_server(serv, serv_type) + + if 'service_clients' in components[name]: + service_clients = components[name]['service_clients'] + for serv, serv_type in service_clients.items(): + node.add_service_client(serv, serv_type) + + if 'action_clients' in components[name]: + action_clients = components[name]['action_clients'] + for action, action_type in action_clients.items(): + node.add_action_client(action, action_type) + + if 'action_servers' in components[name]: + action_servers = components[name]['action_servers'] + for action, action_type in action_servers.items(): + node.add_action_server(action, action_type) + + self.create_model_from_node('my_ros_pkg',"test",node) + + return self.create_ros_model() + + +if __name__ == "__main__": + generator = RosModelGenerator() + try: + generator.generate_ros_model("/tmp/test") + except Exception as e: + print(e.args) diff --git a/rosHumble_model_extractor_scripts/ros_model_generator/rossystem_generator.py b/rosHumble_model_extractor_scripts/ros_model_generator/rossystem_generator.py new file mode 100644 index 0000000..6c4901d --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_model_generator/rossystem_generator.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python +# +# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyparsing import * +import ros_metamodels.ros_metamodel_core as model +import ros_metamodels.rossystem_metamodel_core as system_model +import ros_model_generator.rosmodel_generator as model_generator + + +class RosSystemModelGenerator(object): + def __init__(self,name="", package=""): + self.system = system_model.RosSystem(name, package); + + def setSystemName(self, name): + self.system.name = name; + + def addParameter(self, name, value): + self.system.params.add(model.Parameter(name, value)) + + def addComponent(self, name): + self.system.components.add(system_model.Component(name)) + + def addComponent(self, component): + self.system.components.add(component) + + def dump_ros_system_model(self, rosystem_model_file): + sucess, ros_system_model_str = self.create_ros_system_model() + with open(rosystem_model_file, 'w') as outfile: + outfile.write(ros_system_model_str) + + def dump_ros_system_model_list(self, components, rosystem_model_file): + sucess, ros_system_model_str = self.create_ros_system_model_list(components) + with open(rosystem_model_file, 'w') as outfile: + outfile.write(ros_system_model_str) + + def create_ros_system_model(self): + ros_system_model_str = self.system.dump_xtext_model() + return True, ros_system_model_str + + def create_ros_system_model_list(self, components): + for name in components: + if name == 'global_parameters': + continue + component = system_model.Component(name) + + if 'parameters' in components[name]: + parameters = components[name]['parameters'] + for param_name, param in parameters.items(): + component.params.add(system_model.RosParameter(param_name, param[1], param[0])) + + if 'publishers' in components[name]: + publishers = components[name]['publishers'] + for pub, pub_type in publishers.items(): + component.publishers.add(system_model.RosInterface(pub, pub_type)) + + if 'subscribers' in components[name]: + subscribers = components[name]['subscribers'] + for sub, sub_type in subscribers.items(): + component.subscribers.add(system_model.RosInterface(sub, sub_type)) + + if 'service_servers' in components[name]: + service_servers = components[name]['service_servers'] + for serv, serv_type in service_servers.items(): + component.service_servers.add(system_model.RosInterface(serv, serv_type)) + + if 'service_clients' in components[name]: + service_clients = components[name]['service_clients'] + for serv, serv_type in service_clients.items(): + component.service_clients.add(system_model.RosInterface(serv, serv_type)) + + if 'action_clients' in components[name]: + action_clients = components[name]['action_clients'] + for action, action_type in action_clients.items(): + component.action_clients.add(system_model.RosInterface(action, action_type)) + + if 'action_servers' in components[name]: + action_servers = components[name]['action_servers'] + for action, action_type in action_servers.items(): + component.action_servers.add(system_model.RosInterface(action, action_type)) + + self.addComponent(component) + + if 'global_parameters' in components: + parameters = components['global_parameters'] + for name, param in parameters.items(): + self.addParameter(name, param[0]) + + return self.create_ros_system_model() + + def generate_ros_system_model(self, ros_system_model_file): + sucess, ros_system_model_str = self.create_ros_system_model() + with open(ros_system_model_file, 'w') as outfile: + outfile.write(ros_system_model_str) + + def generate_ros_system_model_list(self, components, ros_system_model_file, ros_model_file="", print_param_value=True): + sucess, ros_system_model_str = self.create_ros_system_model_list(components) + with open(ros_system_model_file, 'w') as outfile: + outfile.write(ros_system_model_str) + + if ros_model_file: + rosmodel_generator = model_generator.RosModelGenerator() + rosmodel_generator.generate_ros_model_from_system(self.system, self.system.package, ros_model_file, print_param_value) + + +if __name__ == "__main__": + generator = RosSystemModelGenerator() + try: + generator.dump_ros_system_model("/tmp/test") + except Exception as e: + print(e.args) + diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/__init__.py b/rosHumble_model_extractor_scripts/ros_model_parser/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/model_comparator.py b/rosHumble_model_extractor_scripts/ros_model_parser/model_comparator.py new file mode 100644 index 0000000..13a15e2 --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_model_parser/model_comparator.py @@ -0,0 +1,89 @@ +from pyparsing import * + + +def strip_slash(string): + return '{}'.format(string[1:] if string.startswith('/') else string) + +# find out missing and additional interfaces +# if both lists are empty, system is running fine +def compare_rossystem_models(model_ref, model_current): + # not sure of the performance of this method + set_ref = set((strip_slash(x.interface_name[0])) + for x in model_ref.interfaces) + set_current = set((strip_slash(x.interface_name[0])) + for x in model_current.interfaces) + + # similarly for all interfaces within the node? + # or only for topic connections? + # does LED's code capture topic connections? + ref_params = dict() + for interface in model_ref.interfaces: + for param in interface.parameters: + key = strip_slash(param.param_name[0]) + ref_params[key] = [param.param_value[0], + interface.interface_name[0]] + + current_params = dict() + for interface in model_current.interfaces: + for param in interface.parameters: + key = strip_slash(param.param_name[0]) + current_params[key] = [ + param.param_value[0], interface.interface_name[0]] + + incorrect_params = dict() + for key, value in ref_params.items(): + try: + current_value = current_params[key][0] + ref_value = ref_params[key][0] + + if (type(current_value) is ParseResults) & (type(ref_value) is ParseResults): + current_value = current_value.asList() + ref_value = ref_value.asList() + if (type(current_value) is str) & (type(ref_value) is str): + current_value = re.sub( + r"[\n\t\s]*", "", strip_slash(current_value)) + ref_value = re.sub( + r"[\n\t\s]*", "", strip_slash(ref_value)) + isEqual = current_value == ref_value + if not isEqual: + incorrect_params.setdefault(current_params[key][1], []) + incorrect_params[current_params[key] + [1]].append([key, current_value]) + except Exception as exc: + pass + + # returning missing_interfaces, additional_interfaces + return list(set_ref - set_current), list(set_current - set_ref), incorrect_params + +def _check_valid(interface_name, interface_type): + if interface_type == '?': + return False + # add more cases here if required + + return True + +def extract_common_ros(model_ref, model_current): + node_ref = list(model_ref.packages[0].artifacts)[0].node + node_current = list(model_current.packages[0].artifacts)[0].node + + set_ref_pub = set((strip_slash(pub.name[0]), pub.type[0]) for pub in node_ref.publishers if _check_valid(pub.name[0], pub.type[0])) + set_current_pub = set((strip_slash(pub.name[0]), pub.type[0]) for pub in node_current.publishers if _check_valid(pub.name[0], pub.type[0])) + + set_ref_sub = set((strip_slash(sub.name[0]), sub.type[0]) for sub in node_ref.subscribers if _check_valid(sub.name[0], sub.type[0])) + set_current_sub = set((strip_slash(sub.name[0]), sub.type[0]) for sub in node_current.subscribers if _check_valid(sub.name[0], sub.type[0])) + + set_ref_srv = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_ref.service_servers if _check_valid(srv.name[0], srv.type[0])) + set_current_srv = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_current.service_servers if _check_valid(srv.name[0], srv.type[0])) + + set_ref_srv_cli = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_ref.service_clients if _check_valid(srv.name[0], srv.type[0])) + set_current_srv_cli = set((strip_slash(srv.name[0]), srv.type[0]) for srv in node_current.service_clients if _check_valid(srv.name[0], srv.type[0])) + + set_ref_act = set((strip_slash(act.name[0]), act.type[0]) for act in node_ref.action_servers if _check_valid(act.name[0], act.type[0])) + set_current_act = set((strip_slash(act.name[0]), act.type[0]) for act in node_current.action_servers if _check_valid(act.name[0], act.type[0])) + + set_ref_act_cli = set((strip_slash(act.name[0]), act.type[0]) for act in node_ref.action_clients if _check_valid(act.name[0], act.type[0])) + set_current_act_cli = set((strip_slash(act.name[0]), act.type[0]) for act in node_current.action_clients if _check_valid(act.name[0], act.type[0])) + + return tuple(set_ref_pub.intersection(set_current_pub)), tuple(set_ref_sub.intersection(set_current_sub)), \ + tuple(set_ref_srv.intersection(set_current_srv)), tuple(set_ref_srv_cli.intersection(set_current_srv_cli)), \ + tuple(set_ref_act.intersection(set_current_act)), tuple(set_ref_act_cli.intersection(set_current_act_cli)) diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/rosmodel_parser.py b/rosHumble_model_extractor_scripts/ros_model_parser/rosmodel_parser.py new file mode 100644 index 0000000..24abd8a --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_model_parser/rosmodel_parser.py @@ -0,0 +1,199 @@ +#!/usr/bin/env python +# +# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyparsing import * +import ros_metamodels.ros_metamodel_core as rosmodel + +# TODO: extract nodes, topics, services, etc from 'result' +# Compute Connections + + +# stateless functions +def parseActionStr(string, location, tokens): + if((len(tokens[0]) == 1) and (type(tokens[0][0]) == str)): + return tokens[0][0] + + +def parseActionDict(string, location, tokens): + dict_list = list() + for toks in tokens: + param_dict = dict() + for tok in toks: + param_dict[tok[0]] = tok[1] + dict_list.append(param_dict) + return dict_list + + +class RosModelParser(object): + def __init__(self, model, isFile=True): + # OCB = Open Curly Bracket {} + # CCB = Close Curly Bracket } + # ORB = Open Round Bracket ( + # CRB = Close Round Bracket ) + # SQ = Single Quotes ' + # OSB = Open Square Bracket [ + # CSB = Close Square Bracket ] + + OCB, CCB, ORB, CRB, SQ, OSB, CSB = map(Suppress, "{}()'[]") + name = Optional(SQ) + Word(printables, + excludeChars="{},'") + Optional(SQ) + + real = Combine(Word(nums) + '.' + Word(nums)) + + listStr = Forward() + mapStr = Forward() + param_value = Forward() + + sglQStr = QuotedString("'", multiline=True) + string_value = Dict( + Group(sglQStr + ZeroOrMore(OCB + param_value + CCB))) + + string_value.setParseAction(parseActionStr) + values = (Combine(Optional("-") + real) | Combine(Optional("-") + Word(nums))).setParseAction( + lambda tokens: float(tokens[0])) | string_value | Keyword("false") | Keyword("true") | listStr | mapStr + + _packageSet = Keyword("PackageSet").suppress() + _package = Keyword("Package").suppress() + _catkin_pkg = Keyword("CatkinPackage").suppress() + _ament_pkg = Keyword("AmentPackage").suppress() + _git_repo = Keyword("FromGitRepo").suppress() + #_artifact = Keyword("artifact").suppress() + _artifacts = Keyword("Artifact").suppress() + #_node = Keyword("node").suppress() + _nodes = Keyword("Node").suppress() + _name = CaselessKeyword("name").suppress() + + # Types + _srv_type= Keyword("service").suppress() + _topic_type= Keyword("message").suppress() + _act_type = Keyword("action").suppress() + + # Service Server + _service_svrs= Keyword("ServiceServers").suppress() + _service_svr= Keyword("ServiceServer").suppress() + service_svr = Group( _service_svr + OCB + _name + name("name") + _srv_type + name("service") + CCB) + service_svrs = (_service_svrs + OCB + OneOrMore(service_svr + Optional(",").suppress()) + CCB) + + # Service Client + _service_clis= Keyword("ServiceClients").suppress() + _service_cli= Keyword("ServiceClient").suppress() + service_cli = Group( _service_cli + OCB + _name + name("name") + _srv_type + name("service") + CCB) + service_clis = (_service_clis + OCB + OneOrMore(service_cli + Optional(",").suppress()) + CCB) + + # Action Server + _action_svrs= Keyword("ActionServers").suppress() + _action_svr= Keyword("ActionServer").suppress() + action_svr = Group( _action_svr + OCB + _name + name("name") + _act_type + name("action") + CCB) + action_svrs = (_action_svrs + OCB + OneOrMore(action_svr + Optional(",").suppress()) + CCB) + + # Action Client + _action_clis= Keyword("ActionClients").suppress() + _action_cli= Keyword("ActionClient").suppress() + action_cli = Group( _action_cli + OCB + _name + name("name") + _act_type + name("action") + CCB) + action_clis = (_action_clis + OCB + OneOrMore(action_cli + Optional(",").suppress()) + CCB) + + # Publisher + _pubs= Keyword("Publishers").suppress() + _pub= Keyword("Publisher").suppress() + pub = Group( _pub + OCB + _name + name("name") + _topic_type + name("message") + CCB) + pubs = (_pubs + OCB + OneOrMore(pub + Optional(",").suppress()) + CCB) + + # Subscriber + _subs= Keyword("Subscribers").suppress() + _sub= Keyword("Subscriber").suppress() + sub = Group( _sub + OCB + _name + name("name") + _topic_type + name("message") + CCB) + subs = (_subs + OCB + OneOrMore(sub + Optional(",").suppress()) + CCB) + + # Parameter + _params= Keyword("Parameters").suppress() + _param= Keyword("Parameter").suppress() + _type= Keyword("type").suppress() + param = Group( _sub + OCB + _name + name("name") + _type + name("type") + CCB) + params = (_params + OCB + OneOrMore(param + Optional(",").suppress()) + CCB) + + self.rospkg_grammar = _packageSet + \ + OCB + \ + ( _catkin_pkg | _ament_pkg | _package ) + name("pkg_name") + OCB + \ + Optional(_git_repo + name("git_repo")) + \ + _artifacts + name("artifact_name") + OCB + \ + _nodes + OCB + _name + name("node_name") + \ + Optional(service_svrs)("svr_servers") + \ + Optional(pubs)("publishers") + \ + Optional(subs)("subscribers") + \ + Optional(service_clis)("svr_clients") + \ + Optional(action_svrs)("act_servers") + \ + Optional(action_clis)("act_clients") + \ + Optional(params)("parameters") + + self._isFile = isFile + self._model = model + + + def _parse_from_string(self): + self._result = self.rospkg_grammar.parseString(self._model) + + def _parse_from_file(self): + self._result = self.rospkg_grammar.parseFile(self._model) + + def parse(self): + self._result = ParseResults() + try: + if self._isFile: + self._parse_from_file() + else: + self._parse_from_string() + except Exception as e: + print(e.args) # Should set a default 'result'? + ros_model = rosmodel.RosModel() + ros_node = rosmodel.Node(self._result.get("node_name")) + + if self._result.get("svr_servers") is not None: + [ros_node.add_service_server(srv_ser.get("name"), srv_ser.get("service")) for srv_ser in self._result.get("svr_servers")] + if self._result.get("svr_clients") is not None: + [ros_node.add_service_client(srv_cli.get("name"), srv_cli.get("service")) for srv_cli in self._result.get("svr_clients")] + if self._result.get("act_servers") is not None: + [ros_node.add_action_server(act_ser.get("name"), act_ser.get("action")) for act_ser in self._result.get("act_servers")] + if self._result.get("act_clients") is not None: + [ros_node.add_action_client(act_cli.get("name"), act_cli.get("action")) for act_cli in self._result.get("act_clients")] + if self._result.get("publishers") is not None: + [ros_node.add_publisher(pub.get("name"), pub.get("message")) for pub in self._result.get("publishers")] + if self._result.get("subscribers") is not None: + [ros_node.add_subscriber(sub.get("name"), sub.get("message")) for sub in self._result.get("subscribers")] + if self._result.get("parameters") is not None: + [ros_node.add_parameter(param.get("name"), param.get("type")) for param in self._result.get("parameters")] + + ros_artifact = rosmodel.Artifact(self._result.get("artifact_name"),ros_node) + ros_package = rosmodel.Package(self._result.get("pkg_name")) + ros_package.add_artifact(ros_artifact) + ros_model.add_package(ros_package) + + return ros_model + + +if __name__ == "__main__": + import os + my_path = os.path.abspath(os.path.dirname(__file__)) + path = os.path.join( + my_path, "../../resources/cob_light.ros") + print(path) + + parser = RosModelParser(path) + try: + print(parser.parse().dump()) + except Exception as e: + print(e.args) + diff --git a/rosHumble_model_extractor_scripts/ros_model_parser/rossystem_parser.py b/rosHumble_model_extractor_scripts/ros_model_parser/rossystem_parser.py new file mode 100644 index 0000000..111c482 --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_model_parser/rossystem_parser.py @@ -0,0 +1,233 @@ +#!/usr/bin/env python +# +# Copyright 2020 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pprint +from pyparsing import * + +# TODO: extract nodes, topics, services, etc from 'result' +# Compute Connections + + +# stateless functions +def parseActionStr(string, location, tokens): + if((len(tokens[0]) == 1) and (type(tokens[0][0]) == str)): + return tokens[0][0] + + +def parseActionDict(string, location, tokens): + dict_list = list() + for toks in tokens: + param_dict = dict() + for tok in toks: + param_dict[tok[0]] = tok[1] + dict_list.append(param_dict) + return dict_list + + +class RosSystemModelParser(object): + def __init__(self, model, isFile=True): + # OCB = Open Curly Bracket { + # CCB = Close Curly Bracket } + # ORB = Open Round Bracket ( + # CRB = Close Round Bracket ) + # SQ = Single Quotes ' + # DQ = Double Quotes " + # OSB = Open Square Bracket [ + # CSB = Close Square Bracket ] + + + OCB, CCB, ORB, CRB, SQ, DQ, OSB, CSB = map(Suppress, "{}()'\"[]") + + name = Optional(SQ) + Optional(DQ) + Word(printables, + excludeChars="{},'") + Optional(SQ) + Optional(DQ) + + real = Combine(Word(nums) + '.' + Word(nums)) + + listStr = Forward() + mapStr = Forward() + param_value = Forward() + + sglQStr = QuotedString("'", multiline=True) + string_value = Dict( + Group(sglQStr + ZeroOrMore(OCB + param_value + CCB))) + + string_value.setParseAction(parseActionStr) + values = (Combine(Optional("-") + real) | Combine(Optional("-") + Word(nums))).setParseAction( + lambda tokens: float(tokens[0])) | string_value | Word(alphanums + "/-_.") | Keyword("false") | Keyword("true") | listStr | mapStr + + _system = Keyword("RosSystem").suppress() + _name = CaselessKeyword("name").suppress() + _component = Keyword("RosComponents").suppress() + _interface = Keyword("ComponentInterface").suppress() + + # Parameter Def + _parameters = Keyword("RosParameters").suppress() + _parameter = Keyword("RosParameter").suppress() + _ref_parameter = Keyword("RefParameter").suppress() + _value = Keyword("value").suppress() + + # Subscriber Def + _subscribers = Keyword("RosSubscribers").suppress() + _subscriber = Keyword("RosSubscriber").suppress() + _ref_subscriber = Keyword("RefSubscriber").suppress() + + # Subscriber Def + _publishers = Keyword("RosPublishers").suppress() + _publisher = Keyword("RosPublisher").suppress() + _ref_publisher = Keyword("RefPublisher").suppress() + + # ServiceServers Def + _services = Keyword("RosSrvServers").suppress() + _service = Keyword("RosServiceServer").suppress() + _ref_service = Keyword("RefServer").suppress() + + # ServiceClients Def + _srv_clients = Keyword("RosSrvClients").suppress() + _srv_client = Keyword("RosServiceClient").suppress() + _ref_srv_client = Keyword("RefClient").suppress() + + # ActionServers Def + _action_servers = Keyword("RosActionServers").suppress() + _action_server = Keyword("RosActionServer").suppress( + ) | Keyword("RosServer").suppress() + _ref_server = Keyword("RefServer").suppress() + + # Actio Clients Def + _action_clients = Keyword("RosActionClients").suppress() + _action_client = Keyword("RosActionClient").suppress( + ) | Keyword("RosClient").suppress() + _ref_action_client = Keyword("RefClient").suppress() + + # Topic Connections Def + _topic_connections = Keyword("TopicConnections").suppress() + _topic_connection = Keyword("TopicConnection").suppress() + _from = Keyword("From").suppress() + _to = Keyword("To").suppress() + + # global parameters Def + _g_parameters = Keyword("Parameters").suppress() + _g_parameter = Keyword("Parameter").suppress() + _type = Keyword("type").suppress() + _value = Keyword("value").suppress() + + listStr << delimitedList(Group(OCB + delimitedList(values) + CCB)) + mapStr << (OSB + delimitedList(Group(OCB + delimitedList((Group( + sglQuotedString.setParseAction(removeQuotes) + Suppress(":") + values))) + CCB)) + CSB) + mapStr.setParseAction(parseActionDict) + + param_value << _value + (values | listStr) + + parameter = Group(_parameter + name("param_name") + + OCB + _ref_parameter + name("param_path") + Optional(param_value("param_value")) + CCB) + parameters = (_parameters + OCB + + OneOrMore(parameter + Optional(",").suppress()) + CCB) + + subscriber = Group(_subscriber + name("sub_name") + + OCB + _ref_subscriber + name("sub_path") + CCB) + subscribers = (_subscribers + OCB + + OneOrMore(subscriber + Optional(",").suppress()) + CCB) + + publisher = Group(_publisher + name("pub_name") + + OCB + _ref_publisher + name("pub_path") + CCB) + publishers = (_publishers + OCB + + OneOrMore(publisher + Optional(",").suppress()) + CCB) + + service = Group(_service + name("srv_name") + + OCB + _ref_service + name("srv_path") + CCB) + services = (_services + OCB + + OneOrMore(service + Optional(",").suppress()) + CCB) + + srv_client = Group(_srv_client + name("srv_name") + + OCB + _ref_srv_client + name("srv_path") + CCB) + srv_clients = (_srv_clients + OCB + + OneOrMore(srv_client + Optional(",").suppress()) + CCB) + + action_server = Group(_action_server + name("action_name") + + OCB + _ref_server + name("action_path") + CCB) + action_servers = (_action_servers + OCB + + OneOrMore(action_server + Optional(",").suppress()) + CCB) + + action_client = Group(_action_client + name("action_name") + + OCB + _ref_action_client + name("action_path") + CCB) + action_clients = (_action_clients + OCB + + OneOrMore(action_client + Optional(",").suppress()) + CCB) + + topic_connection = Group(_topic_connection + name("topic_name") + + OCB + _from + ORB + name("from") + CRB + _to + + ORB + name("to") + CRB + CCB) + + topic_connections = (_topic_connections + OCB + + OneOrMore(topic_connection + Optional(",").suppress()) + CCB) + + g_parameter = Group(_g_parameter + OCB + _name + name("param_name") + + _type + name("value_type") + Optional(param_value("param_value")) + CCB) + g_parameters = (_g_parameters + OCB + + OneOrMore(g_parameter + Optional(",").suppress()) + CCB) + + interface = Group( + _interface + + OCB + + _name + name("interface_name") + + Optional(parameters)("parameters") + + Optional(publishers)("publishers") + + Optional(subscribers)("subscribers") + + Optional(services)("services") + + Optional(srv_clients)("srv_clients") + + Optional(action_servers)("action_servers") + + Optional(action_clients)("action_clients") + + CCB) + + self.rossystem_grammar = _system + \ + OCB + \ + _name + name("system_name") + \ + _component + ORB + \ + OneOrMore(interface + Optional(",").suppress())("interfaces") + CRB \ + + Optional(topic_connections)("topic_connections") \ + + Optional(g_parameters)("global_parameters") + CCB + self._model = model + self._isFile = isFile + + def _parse_from_string(self): + self._result = self.rossystem_grammar.parseString(self._model) + + def _parse_from_file(self): + self._result = self.rossystem_grammar.parseFile(self._model) + + def parse(self): + self._result = ParseResults() + try: + if self._isFile: + self._parse_from_file() + else: + self._parse_from_string() + except Exception as e: + print(e.args) # Should set a default 'result'? + return self._result + + +if __name__ == "__main__": + import os + my_path = os.path.abspath(os.path.dirname(__file__)) + path = os.path.join( + my_path, "../../resources/robotino.rossystem") + print(path) + + parser = RosSystemModelParser(path) + try: + print(parser.parse().dump()) + # print(parser.parse().interfaces[2].services) + except Exception as e: + print(e.args) diff --git a/rosHumble_model_extractor_scripts/ros_pkg_runner.py b/rosHumble_model_extractor_scripts/ros_pkg_runner.py new file mode 100644 index 0000000..318ff80 --- /dev/null +++ b/rosHumble_model_extractor_scripts/ros_pkg_runner.py @@ -0,0 +1,169 @@ +#!/usr/bin/env python +import sys +import os +import subprocess +from datetime import datetime +import shutil +from rosHumble_model_extractor_scripts.file_handling import fileHandling +import glob + +class rosPkgRunner: + + + + def __init__(self, + inputPkgName:str, + inputNodeName:str, + typeOfRequest:str, + outputDir:str, + pathToROSws:str, + gitRepo:str, + setupBashFile:str): + + self.setupBashFile = setupBashFile + self.outputDir = outputDir + + # Private attr + self._inputPkgName = inputPkgName + self._inputNodeName = inputNodeName + self._typeOfRequest = typeOfRequest + self._pathToROSws = pathToROSws + self._gitRepo = gitRepo + self._pathToSRCcode = None + + self._ROS_model_extractor_plugin_file = "rosHumble_model_extractor.py" + + self.mkOuputDir() + + + def getROSModelExtractorPluginFile(self,): + return self._ROS_model_extractor_plugin_file + + def getInputPkgName(self,): + return self._inputPkgName + + def getInputNodeName(self,): + return self._inputNodeName + + def getTypeOfRequest(self,): + return self._typeOfRequest + + def getPathToROSws(self,): + return self._pathToROSws + + def getGitRepo(self,): + return self._gitRepo + + def runSetupBash(self,): + if(os.path.isfile(self.setupBashFile)): + filePath = os.path.join(self.getPathToROSws(), self.setupBashFile) + subprocess.run(['source', filePath], executable="/bin/bash") + + def getPathToSRCcode(self,): + return self._pathToSRCcode + + def setPathToSRCcode(self, newValue): + self._pathToSRCcode = newValue + + def getGitModelRepo(self): + return self.getGitRepo().split(' ')[0] + + + def runROSdepInstall(self,): + subprocess.run(["rosdep", "install", "-y", "-i", "-r", "--from-path", "src"]) + + def runCommonCmd(self,): + self.runSetupBash() + self.runROSdepInstall() + + + def startInstallProcedure(self,): + print("Add additoinal install procedure for respective ROS version") + + def harosInit(self,): + print("## Init HAROS ##") + subprocess.run(["haros","init"]) + + def finishedMsg(self,extractorFile): + print("~~~~~~~~~~~") + print("Extraction finished. See the following report:") + subprocess.run(["cat", extractorFile]) + print("~~~~~~~~~~~\n\n") + + def showROSmodel(self, modelFile): + fileHandling.showFileContent(modelFile) + + + def showAllROSmodel(self,): + + fileList = fileHandling.filterFileNamesByExtension(self.outputDir, 'ros') + + for fileName in fileList: + self.showROSmodel(fileName) + + def cleanUPsrc(self): + + pathToCheck = rosPkgRunner.subDirList(os.path.join(self.getPathToROSws(), 'src')) + for dirs in pathToCheck: + print( "Delete: ", dirs) + shutil.rmtree(dirs) + + def callHarosPlugin(self,): + + print("Update code to execute harosPlugin command!!") + + def mkOuputDir(self,): + + current_timestamp = str(datetime.now().strftime("%Y-%m-%d %H-%M-%S")) + current_timestamp = current_timestamp.split(' ') + output_dir = os.path.join(self.outputDir, current_timestamp[0], current_timestamp[-1]) + + if(not os.path.isdir(output_dir)): + os.makedirs(output_dir) + + self.outputDir = output_dir + print("## Output Directory Created at : {} ##".format(output_dir)) + + def extractorRun(self,): + print(" Write the procedure to run the model extractor file with configuration:") + + @staticmethod + def checkIfValidPkgDir(dirToCheck:str, fileToCheck='package.xml' ): + fileList = os.listdir(dirToCheck) + fileList = [f for f in fileList if os.path.isfile(dirToCheck+'/'+f)] + + numOfFileToCheck = fileList.count(fileToCheck) + if numOfFileToCheck == 1: + return True + else: + if numOfFileToCheck > 0: + print("### More than one {} found in {}, rejected as valid pakage directory ###".format(fileToCheck, dirToCheck)) + return False + + def validatePathToSRCcodeList(self,): + newList = [] + for dirName in self.getPathToSRCcode(): + if rosPkgRunner.checkIfValidPkgDir(dirName): + newList.append(dirName) + + self.setPathToSRCcode(newList) + + @staticmethod + def cloneRepo(ws_dir:str, repo_url:str): + clone_cmd = ["git", "clone" ] + + src_dir = os.path.join(ws_dir, "src/") + print(ws_dir, repo_url, type(ws_dir), src_dir) + os.chdir(src_dir) + repo_url = repo_url.split(' ') + clone_cmd = clone_cmd + repo_url + subprocess.run(clone_cmd) + print("Repository cloned successfully.") + os.chdir(ws_dir) + + + + @staticmethod + def subDirList(parent_dir): + subdirs = [os.path.join(parent_dir, d) for d in os.listdir(parent_dir) if os.path.isdir(os.path.join(parent_dir, d))] + return subdirs \ No newline at end of file diff --git a/ros_model_extractor.py b/ros_model_extractor.py index 1cbe400..a1d5295 100755 --- a/ros_model_extractor.py +++ b/ros_model_extractor.py @@ -44,7 +44,7 @@ def launch(self): #BONSAI PARSER parser = CppAstParser(workspace = ws) parser.set_library_path("/usr/lib/llvm-10/lib") - parser.set_standard_includes("/usr/lib/llvm-10/lib/clang/10.0.0/include") + parser.set_standard_includes("/usr/lib/llvm-14/lib/clang/10.0.0/include") db_dir = os.path.join(ws, "build") if os.path.isfile(os.path.join(db_dir, "compile_commands.json")): parser.set_database(db_dir) @@ -62,6 +62,7 @@ def extract_node(self, name, node_name, pkg_name, ns, ws, rossystem): self.pkg.path= self.args.path_to_src self.pkg_type="AmentPackage" roscomponent = None + #HAROS NODE EXTRACTOR srcdir = self.pkg.path[len(ws):] @@ -308,7 +309,7 @@ def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent if node.language == "cpp": for call in (CodeQuery(gs).all_calls.get()): if "Publisher" in str(call): - #print(call) + print("Printing calll========================", call) if len(call.arguments) > 1: name = analysis._extract_topic(call, topic_pos=0) msg_type = analysis._extract_message_type(call) @@ -325,8 +326,8 @@ def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent if name!="?" or msg_type!="?": RosModel_node.add_subscriber(name, msg_type.replace("/",".").replace(".msg","")) for call in (CodeQuery(gs).all_calls.get()): - if "Service" in str(call) and "::srv::" in str(call): - #print(call) + if "Service" in str(call) or "::srv::" in str(call): + print(call) if len(call.arguments) > 1: name = analysis._extract_topic(call, topic_pos=0) srv_type = analysis._extract_message_type(call)