-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathGameDataGetter.py
53 lines (45 loc) · 1.21 KB
/
GameDataGetter.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
import socket
import struct
import threading
import time
class GameDataGetter:
# GameData info indexes
I_SPEED = 0
I_DISTANCE = 1
I_X = 2
I_Y = 3
I_Z = 4
I_STEER = 5
I_GAS = 6
I_BRAKE = 7
I_FINISH = 8
I_GEAR = 9
I_RPM = 10
I_DX = 11
I_DY = 12
I_DZ = 13
# GameData array length
SIZE = 14
def __init__(self, localhost = "127.0.0.1", port = 9000):
self.game_data = [0.0]*GameDataGetter.SIZE
self.localhost = localhost
self.port = port
# start the thread
self.thread = threading.Thread(target=self._thread_function, daemon=True)
self.thread.start()
# wait for connection
time.sleep(0.2)
def _fetch_data(self, s):
i = 0
while i < GameDataGetter.SIZE:
self.game_data[i] = struct.unpack(b'@f', s.recv(4))[0]
i += 1
def _thread_function(self):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect((self.localhost, self.port))
while True:
self._fetch_data(s)
if __name__ == "__main__":
data_getter = GameDataGetter()
while True:
print(data_getter.game_data[GameDataGetter.I_DX])