-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathamr_testbed_integration_server.py
146 lines (117 loc) · 4.63 KB
/
amr_testbed_integration_server.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
import datetime
import json
import time
import requests
from flask import Flask, jsonify, request
from testbed_config import (AMR, AMR_OFFBOARD_INFRA_REST_API_BASE_URL, PORT,
SENTINEL_DOCK_ID, TaskStatus, WorkCell,
parse_amr_resource_name_to_enum,
parse_location_name_to_enum)
active_missions = {
AMR.AMR_1: None,
AMR.AMR_2: None,
}
# creating a Flask app
app = Flask(__name__)
########### Helper functions ###########
# REST API call to AMR offboard infrastructure (backend) to create a new AMR mission
def create_new_amr_mission(amr, goal):
data = {
"status": TaskStatus.ENQUEUED.value, # Set status to ENQUEUED using integer value
"start": SENTINEL_DOCK_ID, # Temporarily setting this to 0 as it is not currently used on the backend
"goal": goal.value, # Use integer value of the enum
"enqueue_time": datetime.datetime.now().isoformat(), # Set enqueue time to now
# Leave the following fields null/empty
"amr_id": amr.value,
"material_transport_task_chain_id": None,
"assembly_workflow_id": None,
"start_time": None,
"end_time": None,
}
url = f"{AMR_OFFBOARD_INFRA_REST_API_BASE_URL}/amrmissions/"
headers = {"Content-Type": "application/json"}
response = requests.post(url, data=json.dumps(data), headers=headers)
return response
def parse_mission_request(mission):
# Check if the mission is valid
if mission["msgType"] != "StartTask":
return None
# Check whether the mission contains an AMR resource
matched_amr_resource = None
for amr_resource_name in parse_amr_resource_name_to_enum.keys():
if amr_resource_name in mission["resources"]:
matched_amr_resource = parse_amr_resource_name_to_enum[amr_resource_name]
break
if matched_amr_resource is None:
return None
# Match goal location
matched_goal_location = parse_location_name_to_enum[mission["location"]]
return {
"amr": matched_amr_resource,
"goal": matched_goal_location,
}
def generate_mission_completion_payload(amr):
return {
"msgType": "EndTask",
"taskId": active_missions[amr],
"name": active_missions[amr],
"outcome": "success",
}
# Flask Server Views
# Is called by the executor to enqueue a new AMR mission.
# Sample JSON request:
# {
# "msgType": "StartTask",
# "taskId": 1,
# "name": "moveKitToArm",
# "resources": [
# "amr2"
# ],
# "structureType": “Heart",
# "location": "Robot-Arm-2"
# }
@app.route("/execution", methods=["POST"])
def enqueue_new_amr_mission():
# Get the mission from the executor
potential_mission = request.get_json()
mission_to_enqueue = parse_mission_request(potential_mission)
if mission_to_enqueue is not None:
create_new_amr_mission(mission_to_enqueue["amr"], mission_to_enqueue["goal"])
active_missions[mission_to_enqueue["amr"]] = potential_mission
print(
f"{mission_to_enqueue['amr'].name} assigned a mission to go to {mission_to_enqueue['goal'].name}:\n {potential_mission}"
)
response_data = {'message': 'AMR mission created.'}
return jsonify(response_data), 201
# Is called by the ROS offboard comms node to signal mission completion.
# This function forwards the mission completion signal to the executor.
# Sample json request received from the ROS offboard comms node:
# {
# "amr": AMR.AMR_1,
# }
# Sample json POST request to send to the executor:
# {
# "msgType": "EndTask",
# "taskId": 1,
# "name": "moveKitToArm",
# "outcome": "success"
# }
@app.route("/forward_mission_completion", methods=["POST"])
def forward_mission_completion():
mission_completion_info = request.get_json()
amr_val = mission_completion_info["amr_val"]
print(f"{AMR(amr_val).name} has completed its mission.")
executor_payload = generate_mission_completion_payload(
mission_completion_info["amr"]
)
print("Responding to executor with: ", executor_payload)
url = f"{TESTBED_EXECUTOR_SERVER_URL}/amrmissions/"
headers = {"Content-Type": "application/json"}
requests.post(url, data=json.dumps(executor_payload), headers=headers)
# TODO[Shobhit/Zack]: It would be good style for the server to repond with acknowledgement of the mission completion
# Set amr's active mission to None
active_missions[amr] = None
response_data = {'message': 'Forwarded mission completion signal to executor.'}
return jsonify(response_data), 200
if __name__ == "__main__":
app.run(host="0.0.0.0", port=PORT, debug=True)