-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathvolume_thread.py
43 lines (38 loc) · 1.47 KB
/
volume_thread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
import sys
import time
import serial
import logging
import utils
from pathlib import Path
from control import Control
from PyQt5.QtCore import QThread
from PyQt5.QtWidgets import QMessageBox
logger = utils.get_logger()
class VolumeThread(QThread):
def __init__(self, mapping_dir=None):
super().__init__()
logger.info("Creating volume thread.")
self.running = True
self.control = Control(Path.cwd() / 'mapping.txt')
logger.info("Setting up serial communication.")
try:
self.arduino = serial.Serial(self.control.port, self.control.baudrate, timeout=0.1)
logger.info(self.arduino)
except serial.SerialException:
QMessageBox.critical(
None,
"Application already running",
"The application crashed because the serial connection is busy. This may mean "
"that another instance is already running. Please check the system tray or the "
"task manager.",
)
raise
def run(self):
logger.info("Entering thread loop.")
while self.running:
if self.control.sessions is not None:
# Data is formatted as "<val>|<val>|<val>|<val>|<val>"
data = str(self.arduino.readline()[:-2], "utf-8") # Trim off '\r\n'.
if data:
values = [float(val) for val in data.split("|")]
self.control.set_volume(values)