diff --git a/README.md b/README.md index ec4965b..c594bf4 100644 --- a/README.md +++ b/README.md @@ -2,11 +2,6 @@ [![License](https://img.shields.io/badge/License-BSD%203--Clause-blue.svg)](https://opensource.org/licenses/BSD-3-Clause) -[![MELODIC build status](https://github.com/ipa320/ros-model-extractors/actions/workflows/build_melodic.yml/badge.svg)](https://github.com/ipa320/ros-model-extractors/actions/workflows/build_melodic.yml) -[![NOETIC ros-model-extractors](https://github.com/ipa320/ros-model-extractors/actions/workflows/build_noetic.yml/badge.svg)](https://github.com/ipa320/ros-model-extractors/actions/workflows/build_noetic.yml) -[![FOXY build status](https://github.com/ipa320/ros-model-extractors/actions/workflows/build_foxy.yml/badge.svg)](https://github.com/ipa320/ros-model-extractors/actions/workflows/build_foxy.yml) - - Technical Maintainer: [**ipa-nhg**](https://github.com/ipa-nhg/) (**Nadia Hammoudeh Garcia**, **Fraunhofer IPA**) - **nadia.hammoudeh.garcia@ipa.fraunhofer.de** This repository contains the HAROS framework plugin to automatically generate models according to the DSLs defined for RosModel. @@ -17,6 +12,7 @@ This package also contains a set of ROS containers where you can easily do the a As related work you can read the following paper: [Bootstrapping MDE development from ROS manual code: Part 2—Model generation and leveraging models at runtime](https://link.springer.com/article/10.1007/s10270-021-00873-2?wt_mc=Internal.Event.1.SEM.ArticleAuthorOnlineFirst&utm_source=ArticleAuthorOnlineFirst&utm_medium=email&utm_content=AA_en_06082018&ArticleAuthorOnlineFirst_20210420) +:bangbang: The updated version of the development is only being supported for ROS2, the RO1 implementation (old version) is available under [ros1 branch](https://github.com/ipa320/ros-model-extractors/tree/ros1) ### HowTo Use the docker container to run the ros-model plugin for HAROS @@ -26,7 +22,7 @@ Build the HAROS docker image, for your desired ROS distro version: ``` cd path-to-ros-model-extractors-repo -[sudo] docker build --tag=haros_ROSDISTRO -f ROSDISTRO/Dockerfile . +[sudo] docker build --tag=haros_ROSDISTRO -f docker/ROSDISTRO/Dockerfile . ``` Call the ros-model extractor plugin, remember you have to also clone the repository to be analysed: @@ -48,13 +44,8 @@ Additionally, the analysis offers the option to analyze all the nodes of a packa ``` Please check the available examples for the supported distros: - -- [ROS1 melodic](melodic/README.md) -- [ROS1 noetic](noetic/README.md) -- [ROS2 foxy](foxy/README.md) - [ROS2 humble](humble/README.md) ToDo: - - Extractor of interfaces types (msgs, srvs and actions) - Parser for launch files and analysis of the full system diff --git a/docker-compose.yml b/docker/docker-compose.yml similarity index 100% rename from docker-compose.yml rename to docker/docker-compose.yml diff --git a/foxy/Dockerfile b/docker/foxy/Dockerfile similarity index 100% rename from foxy/Dockerfile rename to docker/foxy/Dockerfile diff --git a/foxy/README.md b/docker/foxy/README.md similarity index 100% rename from foxy/README.md rename to docker/foxy/README.md diff --git a/haros_runner.sh b/docker/haros_runner.sh similarity index 79% rename from haros_runner.sh rename to docker/haros_runner.sh index 843d3d9..342fb48 100755 --- a/haros_runner.sh +++ b/docker/haros_runner.sh @@ -44,7 +44,7 @@ then then source install/setup.bash rosdep install -y -i -r --from-path src - colcon build --cmake-args -DCMAKE_EXPORT_COMPILE_COMMANDS=ON + colcon build --cmake-args -DCMAKE_EXPORT_COMPILE_COMMANDS=1 --no-warn-unused-cli source ${5}/install/setup.bash colcon list > /tmp/colcon_list.txt path_to_src_code=$(cat /tmp/colcon_list.txt | grep "^$1" | awk '{ print $2}') @@ -91,16 +91,14 @@ then else python /ros_model_extractor.py --clang-version $clang_version --package "$1" --name "$2" --"${3}" --model-path "${4}" --ws "${5}" --repo $model_repo>> ${4}/extractor.log fi - #cat extractor.log elif [[ $PYTHON_VERSION == "3" ]] then if [ "${2}" = "--all" ] then - python3 /ros_model_extractor.py --clang-version $clang_version --package "$1" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo -a >> ${4}/extractor.log + python3 /ros_code_analysis/ros_model_extractor.py --clang-version $clang_version --package "$1" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo -a >> ${4}/extractor.log else - python3 /ros_model_extractor.py --clang-version $clang_version --package "$1" --name "$2" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo>> ${4}/extractor.log + python3 /ros_code_analysis/ros_model_extractor.py --clang-version $clang_version --package "$1" --name "$2" --"${3}" --model-path "${4}" --ws "${5}" --path-to-src "$path_to_src_code" --repo $model_repo>> ${4}/extractor.log fi - #cat extractor.log else echo "Python version not supported" exit @@ -115,8 +113,14 @@ echo "Extraction finished. See the following report:" cat ${4}/extractor.log echo "~~~~~~~~~~~" + +#echo "~~~~~~~~~~~" +#echo "Compile commands file:" +#cat ${5}/build/compile_commands.json +#echo "~~~~~~~~~~~" + echo "###########" -for generated_model in "${4}"/*.ros +for generated_model in "${4}"/*.ros2 do echo "~~~~~~~~~~~" echo "Print of the model: $generated_model:" @@ -128,4 +132,4 @@ echo "###########" done ## Clean and finish -rm -rf ${5}/src/* +#find ${5}/src -maxdepth 1 -type d ! -iname ros2model -exec rm -rvf {} \; diff --git a/humble/Dockerfile b/docker/humble/Dockerfile similarity index 80% rename from humble/Dockerfile rename to docker/humble/Dockerfile index ba2da99..c7c7744 100644 --- a/humble/Dockerfile +++ b/docker/humble/Dockerfile @@ -25,7 +25,7 @@ ENV LD_LIBRARY_PATH $LD_LIBRARY_PATH:/usr/lib/llvm-14/lib RUN pip3 install --upgrade pip RUN pip3 install -Iv clang==14.0 RUN pip3 install -e git+https://github.com/timtadh/pyflwor.git#egg=pyflwor -RUN pip3 install -e git+https://github.com/ipa320/ros_model_parser.git#egg=ros_model_parser +RUN pip3 install -e git+https://github.com/ipa320/ros2model.git#egg=ros2model RUN pip3 install -e git+https://github.com/git-afsantos/bonsai#egg=bonsai-code RUN pip3 install -e git+https://github.com/ipa-nhg/haros@FixPythonExtractTopic#egg=haros # RUN pip3 install -e git+https://github.com/git-afsantos/haros#egg=haros @@ -41,6 +41,8 @@ RUN usermod -a -G root extractor RUN echo "extractor ALL=(ALL:ALL) NOPASSWD: ALL" | sudo tee /etc/sudoers.d/extractor USER extractor RUN mkdir -p /home/extractor/ws/src +#RUN git clone https://github.com/ipa320/ros2model.git /home/extractor/ws/src/ros2model +RUN git clone https://github.com/ipa-nhg/ros2model.git -b PythonInstallTemplatesFolder /home/extractor/ws/src/ros2model RUN mkdir -p /home/extractor/results RUN chown extractor:extractor /home/extractor/results @@ -55,7 +57,7 @@ RUN if [ $enable_ssh ] ; then mkdir -p /home/extractor/.ssh/ && \ touch /home/extractor/.ssh/config && \ chmod 600 /home/extractor/.ssh/config ; fi -COPY --chown=extractor:root ssh_config/ /keys/ +COPY --chown=extractor:root docker/ssh_config/ /keys/ RUN if [ $enable_ssh ] ; then cat /keys/ssh_key.pub >> /home/extractor/.ssh/authorized_keys ; fi RUN if [ $enable_ssh ] ; then cat /keys/config >> /home/extractor/.ssh/config ; fi #### @@ -64,7 +66,7 @@ USER extractor ENV CMAKE_CXX_COMPILER /usr/lib/llvm-14/bin/clang++ RUN source /opt/ros/$ROS_DISTRO/setup.bash;\ cd /home/extractor/ws;\ - colcon build --cmake-args -DCMAKE_EXPORT_COMPILE_COMMANDS=ON ;\ + colcon build --cmake-args -DCMAKE_EXPORT_COMPILE_COMMANDS=1 --no-warn-unused-cli ;\ source /home/extractor/ws/install/setup.bash; \ haros init @@ -72,11 +74,12 @@ ENV PYTHON_VERSION 3 RUN echo 'source /home/extractor/ws/install/setup.bash' >> /home/extractor/.bashrc #RUN echo "test" -COPY ${path_to_scripts}messages_generator_runner.sh / -COPY ${path_to_scripts}generate_messages_model_helper.sh / -COPY ${path_to_scripts}haros_runner.sh / -COPY ${path_to_scripts}ros_model_extractor.py / -COPY ${path_to_scripts}test.sh / +#COPY ${path_to_scripts}messages_generator_runner.sh / +#COPY ${path_to_scripts}generate_messages_model_helper.sh / +COPY docker/haros_runner.sh / +COPY ros_code_analysis / + +COPY helper_scripts/test.sh / EXPOSE 4005 #CMD sudo chown -R extractor:extractor /home/extractor/results diff --git a/humble/README.md b/docker/humble/README.md similarity index 78% rename from humble/README.md rename to docker/humble/README.md index 079bd30..aae1a0c 100644 --- a/humble/README.md +++ b/docker/humble/README.md @@ -5,7 +5,7 @@ Install docker https://docs.docker.com/install/linux/docker-ce/ubuntu/ Build the HAROS docker image, for your desired ROS distro version: ``` cd path-to-ros-model-extractors-repo -[sudo] docker build --tag=haros_humble -f humble/Dockerfile . +[sudo] docker build --tag=haros_humble -f docker/humble/Dockerfile . ``` Call the ros-model extractor plugin, remember you have to also clone the repository to be analysed: @@ -21,7 +21,8 @@ For example: [sudo] docker run -it haros_humble:latest /haros_runner.sh turtlesim turtlesim_node node . /home/extractor/ws "https://github.com/ros/ros_tutorials -b humble" -[sudo] docker run -it haros_humble:latest /haros_runner.sh test_pkg test_node node . /home/extractor/ws "https://github.com/ipa-nhg/test_ros2_code_extractor -b ros2Parameters" +[sudo] docker run -it haros_humble:latest /haros_runner.sh cpp_basic --all node . /home/extractor/ws "https://github.com/ipa-nhg/cpp_basic_ros2" + ``` diff --git a/melodic/Dockerfile b/docker/melodic/Dockerfile similarity index 100% rename from melodic/Dockerfile rename to docker/melodic/Dockerfile diff --git a/melodic/README.md b/docker/melodic/README.md similarity index 100% rename from melodic/README.md rename to docker/melodic/README.md diff --git a/noetic/Dockerfile b/docker/noetic/Dockerfile similarity index 86% rename from noetic/Dockerfile rename to docker/noetic/Dockerfile index 5569b41..e01b2f1 100644 --- a/noetic/Dockerfile +++ b/docker/noetic/Dockerfile @@ -26,10 +26,11 @@ ENV LD_LIBRARY_PATH $LD_LIBRARY_PATH:/usr/lib/llvm-10/lib RUN pip3 install --upgrade pip RUN pip3 install -Iv clang==10.0.1 RUN pip3 install -e git+https://github.com/timtadh/pyflwor.git#egg=pyflwor -RUN pip3 install -e git+https://github.com/ipa320/ros_model_parser.git#egg=ros_model_parser +#RUN pip3 install -e git+https://github.com/ipa320/ros_model_parser.git#egg=ros_model_parser RUN pip3 install -e git+https://github.com/git-afsantos/bonsai#egg=bonsai-code RUN pip3 install -e git+https://github.com/ipa-nhg/haros@FixPythonExtractTopic#egg=haros # RUN pip3 install -e git+https://github.com/git-afsantos/haros#egg=haros +RUN pip3 install -e git+https://github.com/ipa320/ros2model.git#egg=ros2model RUN apt-get update && apt-get install -y ros-noetic-desktop && apt upgrade -y @@ -56,7 +57,7 @@ RUN if [ $enable_ssh ] ; then mkdir -p /home/extractor/.ssh/ && \ touch /home/extractor/.ssh/config && \ chmod 600 /home/extractor/.ssh/config ; fi -COPY --chown=extractor:root ssh_config/ /keys/ +COPY --chown=extractor:root docker/ssh_config/ /keys/ RUN if [ $enable_ssh ] ; then cat /keys/ssh_key.pub >> /home/extractor/.ssh/authorized_keys ; fi RUN if [ $enable_ssh ] ; then cat /keys/config >> /home/extractor/.ssh/config ; fi #### @@ -75,11 +76,12 @@ RUN source /opt/ros/$ROS_DISTRO/setup.bash;\ ENV PYTHON_VERSION 3 RUN echo 'source /home/extractor/ws/devel/setup.bash' >> /home/extractor/.bashrc -COPY ${path_to_scripts}messages_generator_runner.sh / -COPY ${path_to_scripts}generate_messages_model_helper.sh / -COPY ${path_to_scripts}haros_runner.sh / -COPY ${path_to_scripts}ros_model_extractor.py / -COPY ${path_to_scripts}test.sh / +#COPY ${path_to_scripts}messages_generator_runner.sh / +#COPY ${path_to_scripts}generate_messages_model_helper.sh / +COPY docker/haros_runner.sh / +COPY ros_code_analysis / + +COPY helper_scripts/test.sh / EXPOSE 4004 #CMD sudo chown -R extractor:extractor /home/extractor/results diff --git a/noetic/README.md b/docker/noetic/README.md similarity index 100% rename from noetic/README.md rename to docker/noetic/README.md diff --git a/ssh_config/dummy_ssh b/docker/ssh_config/dummy_ssh similarity index 100% rename from ssh_config/dummy_ssh rename to docker/ssh_config/dummy_ssh diff --git a/haros_runner.py b/haros_runner.py deleted file mode 100644 index 8c43c25..0000000 --- a/haros_runner.py +++ /dev/null @@ -1,44 +0,0 @@ -#!/usr/bin/env python -import sys -import os -import subprocess -from datetime import datetime -import shutil -import glob - -from rosHumble_model_extractor_scripts.ros2runner import ros2Runner - -# Arguments: -# 1: Package name or Git repository name -# 2: Node name or launch file name or '--all' to analyse all the available nodes -# 3: Type of the request: either 'launch' or 'node' -# 4: Path to the folder where the resulting model files should be stored -# 5: Path to the ROS workspace -# 6: Http address links of the Git repositories (to indicate the branch put the name of the repository between quotes and add the suffix -b *banch_name*, for example "https://github.com/ipa320/ros-model-extractors b main" - -def main(): - - CLI_args = sys.argv[1:] - - if len(CLI_args) < 6: - print(f'ERROR: At least 6 arguments expected, only {len(CLI_args)} are given.') - sys.exit() - - pkgName = sys.argv[1] - NodeName = sys.argv[2] - typeOfRequest = sys.argv[3] - pathToOutput = sys.argv[4] - pathToROSws = sys.argv[5] - gitRepo = sys.argv[6] - - ros2Extractor = ros2Runner(inputPkgName=pkgName, - inputNodeName=NodeName, - typeOfRequest=typeOfRequest, - pathToROSws=pathToROSws, - gitRepo=gitRepo, - outputDir=pathToOutput) - ros2Extractor.extractorRun(True) - - -if __name__ == "__main__": - main() \ No newline at end of file diff --git a/generate_messages_model_helper.sh b/helper_scripts/generate_messages_model_helper.sh similarity index 100% rename from generate_messages_model_helper.sh rename to helper_scripts/generate_messages_model_helper.sh diff --git a/messages_generator_runner.sh b/helper_scripts/messages_generator_runner.sh similarity index 100% rename from messages_generator_runner.sh rename to helper_scripts/messages_generator_runner.sh diff --git a/test.sh b/helper_scripts/test.sh similarity index 100% rename from test.sh rename to helper_scripts/test.sh diff --git a/ros_code_analysis/ros_code_analysis/__init__.py b/ros_code_analysis/ros_code_analysis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ros_code_analysis/ros_code_analysis/ros1_cpp_extractor.py b/ros_code_analysis/ros_code_analysis/ros1_cpp_extractor.py new file mode 100755 index 0000000..de1d898 --- /dev/null +++ b/ros_code_analysis/ros_code_analysis/ros1_cpp_extractor.py @@ -0,0 +1,140 @@ +#!/usr/bin/env python +# +# Copyright 2024 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bonsai.analysis import CodeQuery, resolve_expression +from ros_common_extractor import RosCommonExtractor +import ros2model.core.metamodels.metamodel_ros as RosModelMetamodel + +class Ros1CppExtractor(): + def extract_primitives(node, parser, analysis): + gs = parser.global_scope + node.source_tree = parser.global_scope + publishers=[] + subscribers=[] + serviceservers=[] + serviceclients=[] + actionservers=[] + actionclients=[] + parameters=[] + for call in (CodeQuery(gs).all_calls.get()): + for call in (CodeQuery(gs).all_calls.where_name("SimpleActionServer").get()): + if len(call.arguments) > 0: + name = analysis._extract_action(call) + action_type = analysis._extract_action_type(call).split("_<",1)[0] + if name!="?" or act_type!="?": + acs = RosModelMetamodel.ActionServer(name=name,type=act_type.replace(".action","")) + actionservers.append(acs) + for call in (CodeQuery(gs).all_calls.where_name("SimpleActionClient").get()): + if len(call.arguments) > 0: + name = analysis._extract_action(call) + action_type = analysis._extract_action_type(call).split("_<",1)[0] + if name!="?" or act_type!="?": + ac = RosModelMetamodel.ActionServer(name=name,type=act_type.replace(".action","")) + actionclients.append(ac) + for call in (CodeQuery(gs).all_calls.where_name("advertise").where_result("ros::Publisher").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or msg_type!="?": + pub = RosModelMetamodel.Publisher(name=name,type=msg_type.replace(".msg","")) + publishers.append(pub) + for call in (CodeQuery(gs).all_calls.where_name("subscribe").where_result("ros::Subscriber").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or msg_type!="?": + sub = RosModelMetamodel.Subscriber(name=name,type=msg_type.replace(".msg","")) + subscribers.append(sub) + for call in (CodeQuery(gs).all_calls.where_name("advertiseService").where_result("ros::ServiceServer").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call) + srv_type = analysis._extract_message_type(call) + if name!="?" or srv_type!="?": + ss = RosModelMetamodel.ServiceServer(name=name,type=srv_type.replace(".srv","").replace("Request","")) + serviceservers.append(ss) + for call in (CodeQuery(gs).all_calls.where_name("serviceClient").where_result("ros::ServiceClient").get()): + if len(call.arguments) > 1: + name = analysis._extract_topic(call) + srv_type = analysis._extract_message_type(call) + if name!="?" or srv_type!="?": + sc = RosModelMetamodel.ServiceClient(name=name,type=srv_type.replace(".srv","").replace("Response","")) + serviceclients.append(sc) + + #PARAMETERS nhg:this needs review + nh_prefix = "c:@N@ros@S@NodeHandle@" + gets = ("getParam", "getParamCached", "param") + reads = gets + ("hasParam", "searchParam") + sets = ("setParam",) + writes = sets + ("deleteParam",) + for call in CodeQuery(gs).all_calls.where_name(reads).get(): + if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): + param_type = default_value = None + param_name = analysis._extract_topic(call) + if call.name in gets: + param_type = RosCommonExtractor.transform_type(analysis._extract_param_type(call.arguments[1])) + if call.name == "param": + if len(call.arguments) > 2: + default_value = analysis._extract_param_value( call, arg_pos=2) + elif len(call.arguments) == 2: + default_value = analysis._extract_param_value( call, arg_pos=1) + if not ((default_value is None or default_value == "") and param_type is None): + param = RosModelMetamodel.Parameter(name=param_name,type=param_type,value=default_value) + parameters.append(param) + + for call in CodeQuery(gs).all_calls.where_name(writes).get(): + if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): + param_type = value = None + param_name = analysis._extract_topic(call) + if len(call.arguments) >= 2 and call.name in sets: + param_type = RosCommonExtractor.transform_type(analysis._extract_param_type(call.arguments[1])) + value = analysis._extract_param_value(call, arg_pos=1) + if not ((default_value is None or default_value == "") and param_type is None): + param = RosModelMetamodel.Parameter(name=param_name,type=param_type,value=default_value) + parameters.append(param) + ros_prefix = "c:@N@ros@N@param@" + gets = ("get", "getCached", "param") + reads = gets + ("has",) + sets = ("set",) + writes = sets + ("del",) + for call in CodeQuery(gs).all_calls.where_name(reads).get(): + if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): + param_type = default_value = None + param_name = analysis._extract_topic(call) + if call.name == "param": + if call.name in gets: + param_type = RosCommonExtractor.transform_type(analysis._extract_param_type(call.arguments[1])) + if len(call.arguments) > 2: + default_value = analysis._extract_param_value(call, arg_pos=2) + elif len(call.arguments) == 2: + default_value = analysis._extract_param_value(call, arg_pos=1) + if not ((default_value is None or default_value == "") and param_type is None): + param = RosModelMetamodel.Parameter(name=param_name,type=param_type,value=default_value) + parameters.append(param) + for call in CodeQuery(gs).all_calls.where_name(writes).get(): + if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): + param_type = value = None + if len(call.arguments) >= 2 and call.name in sets: + param_type = RosCommonExtractor.transform_type(analysis._extract_param_type(call.arguments[1])) + value = analysis._extract_param_value(call, arg_pos=1) + param_name = analysis._extract_topic(call) + if not ((default_value is None or default_value == "") and param_type is None): + param = RosModelMetamodel.Parameter(name=param_name,type=param_type,value=default_value) + parameters.append(param) + return publishers, subscribers, serviceservers, serviceclients, actionservers, actionclients, parameters \ No newline at end of file diff --git a/ros_code_analysis/ros_code_analysis/ros1_python_extractor.py b/ros_code_analysis/ros_code_analysis/ros1_python_extractor.py new file mode 100644 index 0000000..cd93979 --- /dev/null +++ b/ros_code_analysis/ros_code_analysis/ros1_python_extractor.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# +# Copyright 2024 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bonsai.analysis import CodeQuery, resolve_expression +from ros_common_extractor import RosCommonExtractor +import ros2model.core.metamodels.metamodel_ros as RosModelMetamodel + +class Ros1PythonExtractor(): + def extract_primitives(node, parser, analysis): + gs = parser.global_scope + node.source_tree = parser.global_scope + publishers=[] + subscribers=[] + serviceservers=[] + serviceclients=[] + actionservers=[] + actionclients=[] + parameters=[] + + msgs_list = [] + pkgs_list = [] + for imp_name in parser.imported_names_list: + s = str(imp_name) + if "msg" in s or "srv" in s: + ss = s.split(".") + if len(ss) < 2: + continue + if ss[-1] == "msg" or ss[-1] == "srv": + pkgs_list.append(ss[0]) + elif ss[1] == "msg" or ss[1] == "srv": + msgs_list.append((ss[0], ss[2])) + else: + log.debug(("Python import with 'msg' or 'srv', " + "but unable to process it: ") + + s) + for call in CodeQuery(gs).all_calls.get(): + if "rospy.Publisher" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) + pub = RosModelMetamodel.Publisher(name=name,type=msg_type.replace(".msg","")) + publishers.append(pub) + if "rospy.Subscriber" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) + sub = RosModelMetamodel.Subscriber(name=name,type=msg_type.replace(".msg","")) + subscribers.append(sub) + if "rospy.Service" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call, '', msgs_list) + ss = RosModelMetamodel.ServiceServer(name=name,type=srv_type.replace(".srv","").replace("Request","")) + serviceservers.append(ss) + if "rospy.ServiceProxy" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call, '', msgs_list) + sc = RosModelMetamodel.ServiceClient(name=name,type=srv_type.replace(".srv","").replace("Response","")) + serviceclients.append(sc) + if "rospy.set_param" in str(call): + param_name = analysis._extract_topic(call, topic_pos=0) + param_type = default_value = None + default_value = resolve_expression(call.arguments[1]) + param = RosModelMetamodel.Parameter(name=param_name,type=param_type,value=default_value) + parameters.append(param) \ No newline at end of file diff --git a/ros_code_analysis/ros_code_analysis/ros2_cpp_extractor.py b/ros_code_analysis/ros_code_analysis/ros2_cpp_extractor.py new file mode 100755 index 0000000..1b9311d --- /dev/null +++ b/ros_code_analysis/ros_code_analysis/ros2_cpp_extractor.py @@ -0,0 +1,94 @@ +#!/usr/bin/env python +# +# Copyright 2024 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from bonsai.analysis import CodeQuery, resolve_expression +from ros_common_extractor import RosCommonExtractor +import ros2model.core.metamodels.metamodel_ros as RosModelMetamodel + +class Ros2CppExtractor(): + def extract_primitives(node, parser, analysis): + gs = parser.global_scope + node.source_tree = parser.global_scope + publishers=[] + subscribers=[] + serviceservers=[] + serviceclients=[] + actionservers=[] + actionclients=[] + parameters=[] + for call in (CodeQuery(gs).all_calls.get()): + if "Publisher" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or msg_type!="?": + pub = RosModelMetamodel.Publisher(name=name,type=msg_type.replace(".msg","")) + publishers.append(pub) + if "Subscription" in str(call): # Subscription or Subscriber? + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + msg_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or msg_type!="?": + sub = RosModelMetamodel.Subscriber(name=name,type=msg_type.replace(".msg","")) + subscribers.append(sub) + if "Service" in str(call) and "::srv::" in str(call): #or? + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or srv_type!="?": + ss = RosModelMetamodel.ServiceServer(name=name,type=srv_type.replace(".srv","")) + serviceservers.append(ss) + if "Client" in str(call) and "::srv::" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + srv_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or srv_type!="?": + sc = RosModelMetamodel.ServiceClient(name=name,type=srv_type.replace(".srv","")) + serviceclients.append(sc) + if "Server" in str(call) and "::action::" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + act_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or act_type!="?": + acs = RosModelMetamodel.ActionServer(name=name,type=act_type.replace(".action","")) + actionservers.append(acs) + if "Client" in str(call) and "::action::" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + act_type = analysis._extract_message_type(call) + queue_size = analysis._extract_queue_size(call, queue_pos=1) + if name!="?" or act_type!="?": + ac = RosModelMetamodel.ActionServer(name=name,type=act_type.replace(".action","")) + actionclients.append(ac) + if "declare_parameter" in str(call): + if len(call.arguments) > 1: + name = analysis._extract_topic(call, topic_pos=0) + default_value = resolve_expression(call.arguments[1]) + param_type = RosCommonExtractor.transform_type(resolve_expression(call)) + if not default_value: + param = RosModelMetamodel.Parameter(name=name,type=param_type) + else: + param = RosModelMetamodel.Parameter(name=name,type=param_type,value=str(default_value)) + parameters.append(param) + + return publishers, subscribers, serviceservers, serviceclients, actionservers, actionclients, parameters \ No newline at end of file diff --git a/ros_code_analysis/ros_code_analysis/ros_common_extractor.py b/ros_code_analysis/ros_code_analysis/ros_common_extractor.py new file mode 100755 index 0000000..8934db7 --- /dev/null +++ b/ros_code_analysis/ros_code_analysis/ros_common_extractor.py @@ -0,0 +1,37 @@ +#!/usr/bin/env python +# +# Copyright 2024 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +class RosCommonExtractor(): + def transform_type(param_type): + #if os.environ.get("ROS_VERSION") == "2": + param_type=str(param_type) + param_type=param_type[param_type.find("[")+1:param_type.find("]")] + if param_type == 'double': + return 'Double' + elif param_type == 'bool': + return 'Boolean' + elif param_type == 'int' or param_type == 'long': + return 'Integer' + elif (param_type == 'str' or 'basic_string' in param_type or param_type == 'std::string'): + return 'String' + #elif param_type == 'yaml': + #return 'Struct' + #elif 'std::vector' in param_type: + #return 'List' + else: + return None \ No newline at end of file diff --git a/ros_code_analysis/ros_code_analysis/ros_model_extractor.py b/ros_code_analysis/ros_code_analysis/ros_model_extractor.py new file mode 100755 index 0000000..6d83f89 --- /dev/null +++ b/ros_code_analysis/ros_code_analysis/ros_model_extractor.py @@ -0,0 +1,192 @@ +#!/usr/bin/env python +# +# Copyright 2019 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os +import argparse +import subprocess +from ros2model.api.model_generator.component_generator import ComponentGenerator +import ros2model.core.metamodels.metamodel_ros as RosModelMetamodel + +import rospkg +from copy import deepcopy +#import ament_index_python +from haros.extractor import NodeExtractor, RoscppExtractor, RospyExtractor +from haros.metamodel import Node, Package, RosName, SourceFile +from haros.launch_parser import LaunchParser, LaunchParserError, NodeTag +from haros.cmake_parser import RosCMakeParser +from bonsai.analysis import CodeQuery, resolve_expression +from ros2_cpp_extractor import Ros2CppExtractor +from ros1_cpp_extractor import Ros1CppExtractor +from ros1_python_extractor import Ros1PythonExtractor + + +try: + from bonsai.cpp.clang_parser import CppAstParser +except ImportError: + CppAstParser = None +from bonsai.py.py_parser import PyAstParser + +class RosExtractor(): + def launch(self): + self.parse_arg() + ws = self.args.worspace_path + clang_version= self.args.clang_version + + #BONSAI PARSER + parser = CppAstParser(workspace = ws) + parser.set_library_path("/usr/lib/llvm-"+clang_version+"/lib") + parser.set_standard_includes("/usr/lib/llvm-"+clang_version+"/lib/clang/"+clang_version+".0.0/include") + db_dir = os.path.join(ws, "build") + if os.path.isfile(os.path.join(db_dir, "compile_commands.json")): + parser.set_database(db_dir) + else: + print("The compile_commands.json file can't be found") + if (self.args.node): + self.extract_node(self.args.name, self.args.name, self.args.package_name, None, ws, None) + def extract_node(self, name, node_name, pkg_name, ns, ws, rossystem): + self.pkg = Package(pkg_name) + if os.environ.get("ROS_VERSION") == "1": + rospack = rospkg.RosPack() + self.pkg.path = rospack.get_path(pkg_name) + self.pkg_type="CatkinPackage" + elif os.environ.get("ROS_VERSION") == "2": + self.pkg.path= self.args.path_to_src + self.pkg_type="AmentPackage" + roscomponent = None + + #HAROS NODE EXTRACTOR + srcdir = self.pkg.path[len(ws):] + srcdir = os.path.join(ws, srcdir.split(os.sep, 1)[0]) + bindir = os.path.join(ws, "build") + + #HAROS CMAKE PARSER + parser = RosCMakeParser(srcdir, bindir, pkgs = [self.pkg]) + model_str = "" + if os.path.isfile(os.path.join(self.pkg.path, "CMakeLists.txt")): + parser.parse(os.path.join(self.pkg.path, "CMakeLists.txt")) + node_name_dict = deepcopy(parser.executables) + node_name_dict.update(deepcopy(parser.libraries)) + for target in node_name_dict.values(): + print("INFO: Found artifact: "+target.output_name) + if (self.args.a): + node_name = target.output_name + node = Node(node_name, self.pkg, rosname=RosName(node_name)) + for file_in in target.files: + full_path = file_in + relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] + file_name = full_path.rsplit('/', 1)[-1] + source_file = SourceFile(file_name, relative_path , self.pkg) + node.source_files.append(source_file) + else: + if target.output_name == node_name: + node = Node(node_name, self.pkg, rosname=RosName(node_name)) + for file_in in target.files: + full_path = file_in + relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] + file_name = full_path.rsplit('/', 1)[-1] + source_file = SourceFile(file_name, relative_path , self.pkg) + node.source_files.append(source_file) + else: + continue + if node.language == "cpp": + parser = CppAstParser(workspace = ws) + analysis = RoscppExtractor(self.pkg, ws) + if node.language == "python": + parser = PyAstParser(workspace = ws) + analysis = RospyExtractor(self.pkg, ws) + for sf in node.source_files: + try: + if parser.parse(sf.path) is not None: + # ROS MODEL EXTRACT PRIMITIVES + if node.language == "python": + node_name=node_name.replace(".py","") + graph_name = RosModelMetamodel.GraphName(name=node_name, namespace="", full_name=node_name) + try: + if os.environ.get("ROS_VERSION") == "1": + if (node.language=="cpp"): + [publishers, subscribers, + serviceservers, serviceclients, + actionservers, actionclients, + parameters] = Ros1CppExtractor.extract_primitives(node, parser, analysis) + elif (node.language=="python"): + [publishers, subscribers, + serviceservers, serviceclients, + actionservers, actionclients, + parameters] = Ros1PythonExtractor.extract_primitives(node, parser, analysis) + elif os.environ.get("ROS_VERSION") == "2": + if (node.language=="cpp"): + [publishers, subscribers, + serviceservers, serviceclients, + actionservers, actionclients, + parameters] = Ros2CppExtractor.extract_primitives(node, parser, analysis) + elif (node.language=="python"): + print("ROS2 python...tbd...") + RosModel_node=RosModelMetamodel.Node(name=graph_name, + publisher=publishers, subscriber=subscribers, + serviceserver=serviceservers, serviceclient=serviceclients, + actionserver=actionservers, actionclient=actionclients, + parameter=parameters + ) + except error: + print("Interfaces not found") + print(error) + RosModel_node=RosModelMetamodel.Node(name=graph_name) + RosModel_artifact=RosModelMetamodel.Artifact(name=node_name, node=[RosModel_node]) + RosModel_package=RosModelMetamodel.Package(name=self.args.package_name, artifact=[RosModel_artifact]) + #Model file generator + node_generator = ComponentGenerator() + node_generator.generate_a_file( + model=RosModel_package, + output_dir=self.args.model_path, + filename=node_name+".ros2", + ) + else: + print("The model couldn't be extracted") + except: + pass + if rossystem is not None and roscomponent is not None: + rossystem.add_component(roscomponent) + if self.args.output: + print(model_str) + + def parse_arg(self): + parser = argparse.ArgumentParser() + mutually_exclusive = parser.add_mutually_exclusive_group() + mutually_exclusive.add_argument('--node', '-n', help="node analyse", action='store_true') + mutually_exclusive.add_argument('--launch', '-l', help="launch analyse", action='store_true') + parser.add_argument('--model-path', help='path to the folder in which the model files should be saved', + default='./', + nargs='?', const='./') + parser.add_argument('--output', help='print the model output') + parser.add_argument('--package', required=True, dest='package_name') + parser.add_argument('--name', required=False, dest='name') + parser.add_argument('--ws', required=True, dest='worspace_path') + parser.add_argument('--path-to-src', required=False, dest='path_to_src') + parser.add_argument('--repo', required=False, dest='repo') + parser.add_argument('-a', action='store_true') + parser.add_argument('--clang-version', required=True, dest='clang_version') + self.args = parser.parse_args() + +def main(argv = None): + extractor = RosExtractor() + if extractor.launch(): + return 0 + return 1 + +if __name__== "__main__": + main() diff --git a/ros_code_analysis/setup.py b/ros_code_analysis/setup.py new file mode 100755 index 0000000..098907e --- /dev/null +++ b/ros_code_analysis/setup.py @@ -0,0 +1,35 @@ +#!/usr/bin/env python +# +# Copyright 2024 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) +# +# Nadia Hammoudeh Garcia +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from setuptools import setup + +setup( + name='ros_code_analysis', + version='0.0.1', + description='Static code analysis plugin based on HAROS to extract information from ROS nodes', + url='https://github.com/ipa320/roscode2model', + author='Nadia Hammoudeh Garcia', + author_email='nadia.hammoudeh.garcia@ipa.fraunhofer.de', + license='Apache 2.0', + packages=['ros_code_analysis'], + install_requires=['haros', + 'bonsai', + ], + +) + diff --git a/ros_model_extractor.py b/ros_model_extractor.py deleted file mode 100755 index 7999b5f..0000000 --- a/ros_model_extractor.py +++ /dev/null @@ -1,446 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2019 Fraunhofer Institute for Manufacturing Engineering and Automation (IPA) -# -# Nadia Hammoudeh Garcia -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import argparse -import subprocess -from ros_model_generator.rosmodel_generator import RosModelGenerator -import ros_metamodels.ros_metamodel_core as RosModelMetamodel - -import rospkg -from copy import deepcopy -#import ament_index_python -from haros.extractor import NodeExtractor, RoscppExtractor, RospyExtractor -from haros.metamodel import Node, Package, RosName, SourceFile -from haros.launch_parser import LaunchParser, LaunchParserError, NodeTag -from haros.cmake_parser import RosCMakeParser -from bonsai.analysis import CodeQuery, resolve_expression - -try: - from bonsai.cpp.clang_parser import CppAstParser -except ImportError: - CppAstParser = None -from bonsai.py.py_parser import PyAstParser - -class RosExtractor(): - def launch(self): - self.parse_arg() - ws = self.args.worspace_path - clang_version= self.args.clang_version - - #BONSAI PARSER - parser = CppAstParser(workspace = ws) - parser.set_library_path("/usr/lib/llvm-"+clang_version+"/lib") - parser.set_standard_includes("/usr/lib/llvm-"+clang_version+"/lib/clang/"+clang_version+".0.0/include") - db_dir = os.path.join(ws, "build") - if os.path.isfile(os.path.join(db_dir, "compile_commands.json")): - parser.set_database(db_dir) - else: - print("The compile_commands.json file can't be found") - if (self.args.node): - self.extract_node(self.args.name, self.args.name, self.args.package_name, None, ws, None) - def extract_node(self, name, node_name, pkg_name, ns, ws, rossystem): - self.pkg = Package(pkg_name) - if os.environ.get("ROS_VERSION") == "1": - rospack = rospkg.RosPack() - self.pkg.path = rospack.get_path(pkg_name) - self.pkg_type="CatkinPackage" - elif os.environ.get("ROS_VERSION") == "2": - self.pkg.path= self.args.path_to_src - self.pkg_type="AmentPackage" - roscomponent = None - #HAROS NODE EXTRACTOR - - srcdir = self.pkg.path[len(ws):] - srcdir = os.path.join(ws, srcdir.split(os.sep, 1)[0]) - bindir = os.path.join(ws, "build") - #HAROS CMAKE PARSER - parser = RosCMakeParser(srcdir, bindir, pkgs = [self.pkg]) - model_str = "" - if os.path.isfile(os.path.join(self.pkg.path, "CMakeLists.txt")): - parser.parse(os.path.join(self.pkg.path, "CMakeLists.txt")) - node_name_dict = deepcopy(parser.executables) - node_name_dict.update(deepcopy(parser.libraries)) - for target in node_name_dict.values(): - print("INFO: Found artifact: "+target.output_name) - if (self.args.a): - node_name = target.output_name - node = Node(node_name, self.pkg, rosname=RosName(node_name)) - for file_in in target.files: - full_path = file_in - relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] - file_name = full_path.rsplit('/', 1)[-1] - source_file = SourceFile(file_name, relative_path , self.pkg) - node.source_files.append(source_file) - else: - if target.output_name == node_name: - node = Node(node_name, self.pkg, rosname=RosName(node_name)) - for file_in in target.files: - full_path = file_in - relative_path = full_path.replace(self.pkg.path+"/","").rpartition("/")[0] - file_name = full_path.rsplit('/', 1)[-1] - source_file = SourceFile(file_name, relative_path , self.pkg) - node.source_files.append(source_file) - else: - continue - if node.language == "cpp": - parser = CppAstParser(workspace = ws) - analysis = RoscppExtractor(self.pkg, ws) - if node.language == "python": - parser = PyAstParser(workspace = ws) - analysis = RospyExtractor(self.pkg, ws) - #node.source_tree = parser.global_scope - for sf in node.source_files: - try: - if parser.parse(sf.path) is not None: - # ROS MODEL EXTRACT PRIMITIVES - if node.language == "python": - node_name=node_name.replace(".py","") - RosModel_node=RosModelMetamodel.Node(node_name) - try: - self.extract_primitives(node, parser, analysis, RosModel_node, roscomponent, pkg_name, node_name, node_name) - # SAVE ROS MODEL - ros_model = RosModelGenerator() - ros_model.create_model_from_node(self.pkg.name,node_name, RosModel_node, self.args.repo, self.pkg_type) - print("Save model in:") - print(self.args.model_path+"/"+node_name+".ros") - model_str = ros_model.generate_ros_model(self.args.model_path+"/"+node_name+".ros") - except error: - print("The interfaces can't be extracted "+error) - else: - print("The model couldn't be extracted") - except: - pass - if rossystem is not None and roscomponent is not None: - rossystem.add_component(roscomponent) - if self.args.output: - print(model_str) - - def transform_type(self, param_type): - if os.environ.get("ROS_VERSION") == "2": - param_type=str(param_type) - param_type=param_type[param_type.find("[")+1:param_type.find("]")] - if param_type == 'double': - return 'Double' - elif param_type == 'bool': - return 'Boolean' - elif param_type == 'int' or param_type == 'long': - return 'Integer' - elif (param_type == 'str' or 'basic_string' in param_type or param_type == 'std::string'): - return 'String' - #elif param_type == 'yaml': - #return 'Struct' - #elif 'std::vector' in param_type: - #return 'List' - else: - return None - - - def extract_primitives(self, node, parser, analysis, RosModel_node, roscomponent, pkg_name, node_name, art_name): - gs = parser.global_scope - node.source_tree = parser.global_scope - if os.environ.get("ROS_VERSION") == "1": - if node.language == "cpp": - #print(CodeQuery(gs).all_calls.get()) - for call in (CodeQuery(gs).all_calls.where_name("SimpleActionServer").get()): - if len(call.arguments) > 0: - name = analysis._extract_action(call) - action_type = analysis._extract_action_type(call).split("_<",1)[0] - RosModel_node.add_action_server(name,action_type.replace("/",".")) - #roscomponent.add_interface(name,"actsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("SimpleActionClient").get()): - if len(call.arguments) > 0: - name = analysis._extract_action(call) - action_type = analysis._extract_action_type(call).split("_<",1)[0] - RosModel_node.add_action_client.append(name,action_type.replace("/",".")) - #roscomponent.add_interface(name,"actcls", str(pkg_name)+"."+str(art_name)+"."+str(node_name)+"."+str(name)) - for call in (CodeQuery(gs).all_calls.where_name("advertise").where_result("ros::Publisher").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - RosModel_node.add_publisher(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"pubs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("subscribe").where_result("ros::Subscriber").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - RosModel_node.add_subscriber(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"subs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("advertiseService").where_result("ros::ServiceServer").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call) - srv_type = analysis._extract_message_type(call) - RosModel_node.add_service_server(name, srv_type.replace("/",".").replace("Request","")) - #roscomponent.add_interface(name,"srvsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) - for call in (CodeQuery(gs).all_calls.where_name("serviceClient").where_result("ros::ServiceClient").get()): - if len(call.arguments) > 1: - name = analysis._extract_topic(call) - srv_type = analysis._extract_message_type(call) - RosModel_node.add_service_client(name, srv_type.replace("/",".").replace("Response","")) - #roscomponent.add_interface(name,"srvcls", pkg_name+"."+art_name+"."+node_name+"."+name) - - #PARAMETERS nhg:this needs review - nh_prefix = "c:@N@ros@S@NodeHandle@" - gets = ("getParam", "getParamCached", "param") - reads = gets + ("hasParam", "searchParam") - sets = ("setParam",) - writes = sets + ("deleteParam",) - for call in CodeQuery(gs).all_calls.where_name(reads).get(): - if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): - param_type = default_value = None - param_name = analysis._extract_topic(call) - if call.name in gets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - if call.name == "param": - if len(call.arguments) > 2: - default_value = analysis._extract_param_value( call, arg_pos=2) - elif len(call.arguments) == 2: - default_value = analysis._extract_param_value( call, arg_pos=1) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - - for call in CodeQuery(gs).all_calls.where_name(writes).get(): - if (call.full_name.startswith("ros::NodeHandle") or (isinstance(call.reference, str) and call.reference.startswith(nh_prefix))): - param_type = value = None - param_name = analysis._extract_topic(call) - if len(call.arguments) >= 2 and call.name in sets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - value = analysis._extract_param_value(call, arg_pos=1) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - ros_prefix = "c:@N@ros@N@param@" - gets = ("get", "getCached", "param") - reads = gets + ("has",) - sets = ("set",) - writes = sets + ("del",) - for call in CodeQuery(gs).all_calls.where_name(reads).get(): - if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): - param_type = default_value = None - param_name = analysis._extract_topic(call) - if call.name == "param": - if call.name in gets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - if len(call.arguments) > 2: - default_value = analysis._extract_param_value(call, arg_pos=2) - elif len(call.arguments) == 2: - default_value = analysis._extract_param_value(call, arg_pos=1) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - for call in CodeQuery(gs).all_calls.where_name(writes).get(): - if (call.full_name.startswith("ros::param") or (isinstance(call.reference, str) and call.reference.startswith(ros_prefix))): - param_type = value = None - if len(call.arguments) >= 2 and call.name in sets: - param_type = self.transform_type(analysis._extract_param_type(call.arguments[1])) - value = analysis._extract_param_value(call, arg_pos=1) - param_name = analysis._extract_topic(call) - if not ((default_value is None or default_value == "") and param_type is None): - RosModel_node.add_parameter(param_name, default_value, param_type, None) - - #PYTHON nhg:this needs review - if node.language == "python": - msgs_list = [] - pkgs_list = [] - for imp_name in parser.imported_names_list: - s = str(imp_name) - if "msg" in s or "srv" in s: - ss = s.split(".") - if len(ss) < 2: - continue - if ss[-1] == "msg" or ss[-1] == "srv": - pkgs_list.append(ss[0]) - elif ss[1] == "msg" or ss[1] == "srv": - msgs_list.append((ss[0], ss[2])) - else: - log.debug(("Python import with 'msg' or 'srv', " - "but unable to process it: ") - + s) - for call in CodeQuery(gs).all_calls.get(): - if "rospy.Publisher" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) - #queue_size = analysis._extract_queue_size(call ) - RosModel_node.add_publisher(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"pubs", pkg_name+"."+art_name+"."+node_name+"."+name) - if "rospy.Subscriber" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call, '', msgs_list, pkgs_list) - #queue_size = analysis._extract_queue_size(call ) - RosModel_node.add_subscriber(name, msg_type.replace("/",".")) - #roscomponent.add_interface(name,"subs", pkg_name+"."+art_name+"."+node_name+"."+name) - if "rospy.Service" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call, '', msgs_list) - RosModel_node.add_service_server(name, srv_type.replace("/",".").replace("Request","")) - #roscomponent.add_interface(name,"srvsrvs", pkg_name+"."+art_name+"."+node_name+"."+name) - if "rospy.ServiceProxy" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call, '', msgs_list) - RosModel_node.add_service_client(name, srv_type.replace("/",".").replace("Response","")) - #roscomponent.add_interface(name,"srvcls", pkg_name+"."+art_name+"."+node_name+"."+name) - #if "rospy.get_param" in str(call): - # print("PARAM GET:") - # print(call) - if "rospy.set_param" in str(call): - param_name = analysis._extract_topic(call, topic_pos=0) - param_type = default_value = None - default_value = resolve_expression(call.arguments[1]) - RosModel_node.add_parameter(param_name.replace(".","/"), default_value , param_type, None) - - if os.environ.get("ROS_VERSION") == "2": - #ROS2 - if node.language == "cpp": - for call in (CodeQuery(gs).all_calls.get()): - if "Publisher" in str(call): - #print(call) - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or msg_type!="?": - RosModel_node.add_publisher(name, msg_type.replace("/",".").replace(".msg","")) - if "Subscription" in str(call): # Subscription or Subscriber? - #print(call) - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - msg_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or msg_type!="?": - RosModel_node.add_subscriber(name, msg_type.replace("/",".").replace(".msg","")) - if "Service" in str(call) and "::srv::" in str(call): #or? - #print(call) - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - print(name + " " + srv_type) - if name!="?" or srv_type!="?": - RosModel_node.add_service_server(name, srv_type.replace("/",".").replace(".srv","")) - if "Client" in str(call) and "::srv::" in str(call): - #print(call) - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - srv_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - print(name + " " + srv_type) - if name!="?" or srv_type!="?": - RosModel_node.add_service_client(name, srv_type.replace("/",".").replace(".srv","")) - if "Client" in str(call) and "::action::" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - act_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or act_type!="?": - RosModel_node.add_action_client(name, act_type.replace("/",".").replace(".action","")) - if "Server" in str(call) and "::action::" in str(call): - if len(call.arguments) > 1: - name = analysis._extract_topic(call, topic_pos=0) - act_type = analysis._extract_message_type(call) - queue_size = analysis._extract_queue_size(call, queue_pos=1) - if name!="?" or act_type!="?": - RosModel_node.add_action_server(name, act_type.replace("/",".").replace(".action","")) - #PARAMETERS ROS2 - params=[] - written_params=[] - for call in (CodeQuery(gs).all_calls.get()): - param_prefix = "c:@N@rclcpp@S@Node@F@" - declare_params = ("get_parameter","declare_parameter") - if (call.full_name.startswith("rclcpp::Node") or (isinstance(call.reference, str)) and call.reference.startswith(param_prefix)): - param_type = default_value = None - if(call.name in declare_params) and len(call.arguments) > 1: - param_name = analysis._extract_topic(call, topic_pos=0) - if len(call.arguments) == 2: - param_type= self.transform_type(resolve_expression(call.arguments[1])) - params.append((param_name, param_type)) - elif len(call.arguments) > 2 and not ('[rclcpp::ParameterValue] (default)' in str(resolve_expression(call.arguments[1]))): - default_value = resolve_expression(call.arguments[1]) - param_type = self.transform_type(resolve_expression(call)) - params.append((param_name, param_type, default_value)) - for parameter in params: - param_name_ = param_type_ = default_value_ = None - if len(parameter) > 2: - param_name_, param_type_, default_value_ = parameter - if not ((default_value_ is None or default_value_ == "") and param_type_ is None): - RosModel_node.add_parameter(param_name_.replace(".","/"), default_value_ , param_type_, None) - written_params.append(param_name_) - elif len(parameter) == 2: - param_name_, param_type_ = parameter - if not (param_type_ is None) and not (param_name_ in written_params): - RosModel_node.add_parameter(param_name_.replace(".","/"), default_value_ , param_type_, None) - - def parse_arg(self): - parser = argparse.ArgumentParser() - mutually_exclusive = parser.add_mutually_exclusive_group() - mutually_exclusive.add_argument('--node', '-n', help="node analyse", action='store_true') - mutually_exclusive.add_argument('--launch', '-l', help="launch analyse", action='store_true') - parser.add_argument('--model-path', help='path to the folder in which the model files should be saved', - default='./', - nargs='?', const='./') - parser.add_argument('--output', help='print the model output') - parser.add_argument('--package', required=True, dest='package_name') - parser.add_argument('--name', required=False, dest='name') - parser.add_argument('--ws', required=True, dest='worspace_path') - parser.add_argument('--path-to-src', required=False, dest='path_to_src') - parser.add_argument('--repo', required=False, dest='repo') - parser.add_argument('-a', action='store_true') - parser.add_argument('--clang-version', required=True, dest='clang_version') - self.args = parser.parse_args() - - -class RosInterface: - def __init__(self, name, ref): - self.name = name - self.ref = ref - -class ros_component: - def __init__(self, name, ns): - self.name = ns+name if ns else name - self.ns = ns - self.pubs = [] - self.subs = [] - self.srvsrvs = [] - self.srvcls = [] - self.actsrvs = [] - self.actcls = [] - def add_interface(self, name, interface_type, ref): - if interface_type == "pubs": - self.pubs.append(RosInterface(name,ref)) - if interface_type == "subs": - self.subs.append(RosInterface(name,ref)) - if interface_type == "srvsrvs": - self.srvsrvs.append(RosInterface(name,ref)) - if interface_type == "srvcls": - self.srvcls.append(RosInterface(name,ref)) - if interface_type == "actsrvs": - self.actsrvs.append(RosInterface(name,ref)) - if interface_type == "actcls": - self.actcls.append(RosInterface(name,ref)) - -def main(argv = None): - extractor = RosExtractor() - if extractor.launch(): - return 0 - return 1 - -if __name__== "__main__": - main()