-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathgps_node.py
126 lines (109 loc) · 3.76 KB
/
gps_node.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
#!/usr/bin/env python3
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import NavSatFix
import serial
from datetime import datetime
class GPS_Node(Node):
def __init__(self):
super().__init__('gps_node')
self.serial_port = serial.Serial('/dev/ttyACM0', baudrate=9600, timeout=0.5)
#uart = busio.UART(board.TX, board.RX, baudrate=9600, timeout=0.5)
# Enable SBAS
enable_sbas(self.serial_port)
self.publisher_ = self.create_publisher(NavSatFix, 'gps/fix', 10)
self.timer = self.create_timer(1.0, self.timer_callback)
def timer_callback(self):
# Enable SBAS
enable_sbas(self.serial_port)
lines = []
for _ in range(5): # Read up to 5 lines
try:
line = self.serial_port.readline().decode('ascii', errors='replace').strip()
if line: # Only add non-empty lines
lines.append(line)
except Exception as e:
self.get_logger().error(f"Error reading from serial: {e}")
break
latitude = 0.0
longitude = 0.0
altitude = 0.0
for line in lines:
print(line)
if line.startswith('$GPRMC'):
[position_status, latitude, longitude] = process_gprmc(line)
elif line.startswith('$GPVTG'):
process_gpvtg(line)
elif line.startswith('$GPGSA'):
process_gpgga(line)
elif line.startswith('$GPGGA'):
altitude = process_gptxt(line)
elif line.startswith('$GPGSV'):
process_gpgsv(line)
elif line.startswith('$GPGLL'):
process_gpgll(line)
else:
print("Unsupported NMEA sentence!!!")
if latitude == None:
latitude = 0.0
if longitude == None:
longitude = 0.0
if altitude == None:
altitude = 0.0
msg = NavSatFix()
msg.latitude = latitude
msg.longitude = longitude
msg.altitude = altitude
self.publisher_.publish(msg)
self.get_logger().info(f'Published GPS Data - Lat: {latitude}, Lon: {longitude}, Alt: {altitude}')
def nmea_to_decimal(nmea_str, direction):
degrees = float(nmea_str[:2])
minutes = float(nmea_str[2:])
decimal = degrees + minutes / 60.0
if direction in ['S', 'W']:
decimal *= -1
return decimal
def process_gprmc(line):
parts = line.split(',')
position_status = parts[2]
latitude = nmea_to_decimal(parts[3],parts[4]) if parts[3] and parts[4] else 0.0
longitude = nmea_to_decimal(parts[5],parts[6]) if parts[5] and parts[6] else 0.0
speed = parts[7]
heading = parts[8]
#date_str = parts[9]
#date = datetime.strptime(date_str, '%m%d%Y').date()
return [position_status, latitude, longitude]
def process_gpvtg(line):
pass
def process_gpgga(line):
parts = line.split(',')
altitude = float(parts[9]) if parts[9] != '' else 0.0
if parts[10] == 'M':
pass
elif parts[10] == 'KM':
altitude *= 1000
return altitude
def process_gptxt(line):
pass
def process_gpgsv(line):
pass
def process_gpgll(line):
pass
def enable_sbas(serial_port):
"""
Sends the PMTK command to enable SBAS on the BN-180 GPS module.
"""
SBAS_ENABLE_COMMAND = b"$PMTK313,1*2E\r\n"
try:
serial_port.write(SBAS_ENABLE_COMMAND)
print("SBAS enable command sent.")
except Exception as e:
print(f"Failed to send SBAS command: {e}")
def main(args=None):
rclpy.init(args=args)
node = GPS_Node()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()