Skip to content

Commit

Permalink
Merge pull request #76 from sourcebots/kch-support
Browse files Browse the repository at this point in the history
Add support for KCH leds
  • Loading branch information
WillB97 authored Jul 14, 2024
2 parents 62baac0 + 2f36840 commit f911fc6
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 32 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ jobs:
MYPY: venv/bin/mypy
steps:
- uses: actions/checkout@v3
- name: Set up Python 3.7
- name: Set up Python 3.8
uses: actions/setup-python@v3
with:
python-version: 3.7
python-version: 3.8
- name: Set up virtualenv
run: python -m venv venv
- name: Install dependencies
Expand Down
116 changes: 88 additions & 28 deletions runusb/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
# This will be populated if we have the config file
# url format: mqtt[s]://[<username>[:<password>]@]<host>[:<port>]/<topic_root>
MQTT_URL = None
MQTT_TOPIC_ROOT = ''
MQTT_CLIENT = None
MQTT_CONFIG_FILE = '/etc/sbot/mqtt.conf'


Expand Down Expand Up @@ -86,37 +88,84 @@ class Mountpoint(NamedTuple):
)


class LedStatus(Enum):
NoUSB = (False, False, False) # Off
Running = (False, False, True) # Blue
Killed = (True, False, True) # Magenta
Finished = (False, True, False) # Green
Crashed = (True, False, False) # Red


class LEDController():
@unique
class LEDs(IntEnum):
RED = 2
YELLOW = 3
GREEN = 4
BOOT_100 = 13
CODE = 11
COMP = 16
WIFI = 8
STATUS_RED = 26
STATUS_GREEN = 20
STATUS_BLUE = 21

def __init__(self) -> None:
if IS_PI:
LOGGER.debug("Configuring LED controller")
self._register_exit()
atexit.register(GPIO.cleanup) # type: ignore[attr-defined]
GPIO.setmode(GPIO.BCM)
GPIO.setup([led.value for led in self.LEDs], GPIO.OUT, initial=GPIO.LOW)

def red(self) -> None:
def _register_exit(self) -> None:
"""
Ensure `atexit` triggers on `SIGTERM`.
> The functions registered via [`atexit`] are not called when the program is
killed by a signal not handled by Python
"""

if signal.getsignal(signal.SIGTERM) != signal.SIG_DFL:
# If a signal handler is already present for SIGTERM,
# this is sufficient for `atexit` to trigger, so do nothing.
return

def handle_signal(handled_signum: int, frame) -> None:
"""Semi-default signal handler for SIGTERM, enough for atexit."""
USERCODE_LOGGER.error(signal.strsignal(handled_signum))
exit(128 + handled_signum) # 143 for SIGTERM

# Add the null-ish signal handler
signal.signal(signal.SIGTERM, handle_signal)

def mark_start(self) -> None:
if IS_PI:
GPIO.output(self.LEDs.BOOT_100, GPIO.HIGH)

def set_comp(self, value: bool) -> None:
if IS_PI:
GPIO.output(self.LEDs.COMP, GPIO.HIGH if value else GPIO.LOW)

def set_code(self, value: bool) -> None:
if IS_PI:
GPIO.output(self.LEDs.RED, GPIO.HIGH)
GPIO.output(self.LEDs.YELLOW, GPIO.LOW)
GPIO.output(self.LEDs.GREEN, GPIO.LOW)
GPIO.output(self.LEDs.CODE, GPIO.HIGH if value else GPIO.LOW)

def yellow(self) -> None:
def set_wifi(self, value: bool) -> None:
if IS_PI:
GPIO.output(self.LEDs.RED, GPIO.LOW)
GPIO.output(self.LEDs.YELLOW, GPIO.HIGH)
GPIO.output(self.LEDs.GREEN, GPIO.LOW)
GPIO.output(self.LEDs.WIFI, GPIO.HIGH if value else GPIO.LOW)

def green(self) -> None:
def set_status(self, value: LedStatus) -> None:
if IS_PI:
GPIO.output(self.LEDs.RED, GPIO.LOW)
GPIO.output(self.LEDs.YELLOW, GPIO.LOW)
GPIO.output(self.LEDs.GREEN, GPIO.HIGH)
GPIO.output(self.LEDs.STATUS_RED, GPIO.HIGH if value.value[0] else GPIO.LOW)
GPIO.output(self.LEDs.STATUS_GREEN, GPIO.HIGH if value.value[1] else GPIO.LOW)
GPIO.output(self.LEDs.STATUS_BLUE, GPIO.HIGH if value.value[2] else GPIO.LOW)

# Also send the status over MQTT
if MQTT_CLIENT is not None:
MQTT_CLIENT.publish(
f'{MQTT_TOPIC_ROOT}/state',
json.dumps({"state": value.name}),
qos=1,
retain=True,
)


LED_CONTROLLER = LEDController()
Expand Down Expand Up @@ -198,7 +247,9 @@ def close(self) -> None:
class RobotUSBHandler(USBHandler):
def __init__(self, mountpoint_path: str) -> None:
self._setup_logging(mountpoint_path)
LED_CONTROLLER.yellow()
LED_CONTROLLER.set_code(True)
LED_CONTROLLER.set_status(LedStatus.Running)

env = dict(os.environ)
env["SBOT_METADATA_PATH"] = MOUNTPOINT_DIR
if MQTT_URL is not None:
Expand All @@ -223,15 +274,19 @@ def __init__(self, mountpoint_path: str) -> None:
target=self._log_output, args=(self.process.stdout,))
self.log_thread.start()

def close(self) -> None:
def cleanup(self) -> None:
self._send_signal(signal.SIGTERM)
try:
# Wait for the process to exit
self.process.communicate(timeout=5)
except subprocess.TimeoutExpired:
# The process did not exit after 5 seconds, so kill it.
self._send_signal(signal.SIGKILL)
self._set_leds()

def close(self) -> None:
self.cleanup()
LED_CONTROLLER.set_status(LedStatus.NoUSB)
LED_CONTROLLER.set_code(False)
USERCODE_LOGGER.removeHandler(self.handler)

def _send_signal(self, sig: int) -> None:
Expand All @@ -245,8 +300,10 @@ def _watch_process(self) -> None:
self.process.wait()
if self.process.returncode != 0:
USERCODE_LOGGER.warning(f"Process exited with code {self.process.returncode}")
LED_CONTROLLER.set_status(LedStatus.Crashed)
else:
USERCODE_LOGGER.info("Your code finished successfully.")
LED_CONTROLLER.set_status(LedStatus.Finished)

process_lifetime = time.time() - self.process_start_time

Expand All @@ -256,7 +313,7 @@ def _watch_process(self) -> None:
time.sleep(1 - process_lifetime)

# Start clean-up
self.close()
self.cleanup()

def _setup_logging(self, log_dir: str) -> None:
self._rotate_old_logs(log_dir)
Expand Down Expand Up @@ -284,12 +341,6 @@ def _log_output(self, pipe: IO[str]) -> None:
USERCODE_LOGGER.log(USERCODE_LEVEL, line.rstrip('\n'))
LOGGER.info('Process output finished')

def _set_leds(self) -> None:
if self.process.returncode == 0:
LED_CONTROLLER.green()
else:
LED_CONTROLLER.red()

def _rotate_old_logs(self, log_dir: str) -> None:
"""
Add a suffix to the old log file, if it exists.
Expand All @@ -309,10 +360,12 @@ def _rotate_old_logs(self, log_dir: str) -> None:

class MetadataUSBHandler(USBHandler):
def __init__(self, mountpoint_path: str) -> None:
pass # Nothing to do.
# NOTE the comp LED just represents the presence of a comp mode USB
# not whether comp mode is enabled
LED_CONTROLLER.set_comp(True)

def close(self) -> None:
pass # Nothing to do.
LED_CONTROLLER.set_comp(False)


class AutorunProcessRegistry(object):
Expand Down Expand Up @@ -414,7 +467,7 @@ def read_mqtt_config_file() -> MQTTVariables | None:


def setup_usercode_logging() -> None:
global REL_TIME_FILTER
global REL_TIME_FILTER, MQTT_CLIENT, MQTT_TOPIC_ROOT
REL_TIME_FILTER = RelativeTimeFilter()
USERCODE_LOGGER.addFilter(REL_TIME_FILTER)
USERCODE_LOGGER.setLevel(logging.DEBUG)
Expand All @@ -432,7 +485,11 @@ def setup_usercode_logging() -> None:
username=mqtt_config.username,
password=mqtt_config.password,
connected_topic=f"{mqtt_config.topic_prefix}/connected",
connected_callback=lambda: LED_CONTROLLER.set_wifi(True),
disconnected_callback=lambda: LED_CONTROLLER.set_wifi(False),
)
MQTT_CLIENT = handler.mqtt
MQTT_TOPIC_ROOT = mqtt_config.topic_prefix

handler.setLevel(logging.INFO)
handler.setFormatter(TieredFormatter(
Expand All @@ -452,6 +509,9 @@ def main():

registry = AutorunProcessRegistry()

LED_CONTROLLER.mark_start()
LED_CONTROLLER.set_status(LedStatus.NoUSB)

# Initial pass (in case an autorun FS is already mounted)
registry.update_filesystems(fstab_reader.read())

Expand Down
4 changes: 2 additions & 2 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,11 @@ url = https://github.com/sourcebots/runusb
python_requires = >=3.8
packages = find:
install_requires =
logger-extras==0.4.0
logger-extras==0.4.1
rpi.GPIO==0.7.1

[options.extras_require]
mqtt = logger-extras[mqtt]==0.4.0
mqtt = logger-extras[mqtt]==0.4.1

[options.entry_points]
console_scripts =
Expand Down

0 comments on commit f911fc6

Please sign in to comment.