diff --git a/core/core-linux.py b/core/core-linux.py index be0ebe2..48e3efc 100644 --- a/core/core-linux.py +++ b/core/core-linux.py @@ -14,7 +14,6 @@ @ctxLinux.action_class("user") class UserActions: - def toggle_reader(): """Toggles the screen reader on and off""" actions.user.toggle_orca() @@ -44,7 +43,7 @@ def tts(text: str, interrupt: bool = True): def espeak(text: str): """Text to speech with a robotic/narrator voice""" - rate = settings.get("user.tts_speed", 0) + rate = settings.get("user.tts_speed") # convert -10 to 10 to -100 to 100 rate = rate * 10 # text = remove_special(text) diff --git a/core/core-windows.py b/core/core-windows.py index d41a9d8..a7db87b 100644 --- a/core/core-windows.py +++ b/core/core-windows.py @@ -114,7 +114,7 @@ def base_win_tts(text: str, interrupt: bool): """Base function for windows tts. We expose this so we can share the speaker object across files. We don't want it to get overridden by the other tts functions""" - speaker.set_rate(settings.get("user.tts_speed", 0)) + speaker.set_rate(settings.get("user.tts_speed")) speaker.set_volume(settings.get("user.tts_volume", 50)) speaker.speak(text, interrupt) diff --git a/core/screenreader_ipc/ipc_client.py b/core/screenreader_ipc/ipc_client.py index 17439a8..420f816 100644 --- a/core/screenreader_ipc/ipc_client.py +++ b/core/screenreader_ipc/ipc_client.py @@ -1,33 +1,104 @@ -from talon import Module, Context, actions, settings +from talon import Module, Context, actions, settings, cron import os import ipaddress import json import socket import threading -from typing import Tuple -from .ipc_schema import IPC_COMMAND +from typing import Tuple, assert_never, Optional +from .ipc_schema import ( + IPC_COMMAND, + IPCServerResponse, + IPCClientResponse, + ServerSpec, + ServerStatusResult, + ResponseBundle, +) + mod = Module() lock = threading.Lock() +def handle_ipc_result( + client_response: IPCClientResponse, + server_response: IPCServerResponse, +) -> list[Tuple[IPC_COMMAND, Optional[any]]]: + """ + Sanitize the response from the screenreader server + and return just the commands and their return values + if present + """ + if settings.get("user.addon_debug"): + print(f"Received responses\n{client_response=}\n{server_response=}") + + match client_response, server_response: + case ( + ( + IPCClientResponse.NO_RESPONSE + | IPCClientResponse.TIMED_OUT + | IPCClientResponse.GENERAL_ERROR, + _, + ) as error + ): + raise RuntimeError( + f"Clientside {error=} communicating with screenreader extension" + ) + case (IPCClientResponse.SUCCESS, _): + # empty case is here for exhaustiveness + pass + + for cmd, value, status in zip( + server_response["processedCommands"], + server_response["returnedValues"], + server_response["statusResults"], + ): + match status: + case ServerStatusResult.SUCCESS: + # empty case is here for exhaustiveness + pass + case ServerStatusResult.INVALID_COMMAND_ERROR: + raise ValueError(f"Invalid command '{cmd}' sent to screenreader") + case ServerStatusResult.JSON_ENCODE_ERROR: + raise ValueError( + "Invalid JSON payload sent from client to screenreader" + ) + case ( + ( + ServerStatusResult.INTERNAL_SERVER_ERROR + | ServerStatusResult.RUNTIME_ERROR + ) as error + ): + raise RuntimeError(f"{error} processing command '{cmd}'") + case _: + assert_never((cmd, value, status)) + + return list( + zip(server_response["processedCommands"], server_response["returnedValues"]) + ) + + @mod.action_class class Actions: def addon_server_endpoint() -> Tuple[str, str, str]: """Returns the address, port, and valid commands for the addon server""" - def send_ipc_commands(commands: list[IPC_COMMAND]): - """Sends a command or commands to the screenreader""" + # We use a list and not a dict since we can have duplicate commands in the same payload + def send_ipc_commands( + commands: list[IPC_COMMAND], + ) -> list[Tuple[IPC_COMMAND, Optional[any]]]: + """Sends a bundle of commands to the screenreader""" actions.user.tts("No screenreader running to send commands to") raise NotImplementedError - def send_ipc_command(command: IPC_COMMAND): + # We need a separate command for single commands since we can't easily + # pass in a list via a .talon file and thus this allows a single string instead + def send_ipc_command( + command: IPC_COMMAND, + ) -> Optional[any]: """ Sends a single command to the screenreader. - This is its own function since old versions of talon - don't support union type hints and having a separate - function is a workaround a clearer API than passing in a list - for a single command + This is its own function since it is a clearer API than passing in + a list for a single command """ actions.user.tts("No screenreader running to send commands to") raise NotImplementedError @@ -48,7 +119,7 @@ def addon_server_endpoint() -> Tuple[str, str, str]: ) with open(SPEC_FILE, "r") as f: - spec = json.load(f) + spec: ServerSpec = json.load(f) address = spec["address"] port = spec["port"] valid_commands = spec["valid_commands"] @@ -64,23 +135,49 @@ def addon_server_endpoint() -> Tuple[str, str, str]: return address, port, valid_commands - def send_ipc_commands(commands: list[IPC_COMMAND]): + # Should be used only for single commands or debugging + def send_ipc_commands( + commands: list[IPC_COMMAND], + ) -> list[Tuple[IPC_COMMAND, Optional[any]]]: """Sends a list of commands or a single command string to the NVDA screenreader""" - ip, port, valid_commands = actions.user.addon_server_endpoint() + + # this function can still be called if NVDA is running, since cron + # is ran 400ms after the check, so we can check again here after the + # scheduler runs the function + if not actions.user.is_nvda_running(): + return + + try: + ip, port, valid_commands = actions.user.addon_server_endpoint() + except FileNotFoundError: + # If we just shut the server down via a talon command + # we need to check again after a delay once the controller + # client is updated + def check_if_shutdown(): + if not actions.user.is_nvda_running(): + # Ignore, we expect the server to be shut down + # and the file to be gone + return + else: + raise FileNotFoundError( + "NVDA is running but the server spec file is missing" + ) + + cron.after("2s", check_if_shutdown) for command in commands: if command not in valid_commands: - raise ValueError(f"Invalid command: {command}") + raise ValueError(f"Server cannot process command: {command}") sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(0.2) encoded = json.dumps(commands).encode() - if settings.get("user.addon_debug"): - print(f"Sending {commands} to {ip}:{port}") - # Default response if nothing is set - response = b"No response from NVDA addon server" + response: ResponseBundle = { + "client": IPCClientResponse.NO_RESPONSE, + "server": None, + } # Although the screenreader server will block while processing commands, # having a lock client-side prevents errors when sending multiple commands @@ -91,24 +188,43 @@ def send_ipc_commands(commands: list[IPC_COMMAND]): # Block until we receive a response # We don't want to execute commands until # we know the screen reader has the proper settings - response = sock.recv(1024) - if settings.get("user.addon_debug"): - print("Received", repr(response)) - - except socket.timeout as e: - print(f"Clientside connection with {ip}:{port} timed out") - print(e) - response = b"Clientside connection with NVDA timed out" - except Exception as e: - print(f"Error Communicating with NVDA extension: {e}") - response = b"Error communicating with NVDA extension" + raw_data = sock.recv(1024) + + raw_response: IPCServerResponse = json.loads(raw_data.decode("utf-8")) + raw_response["statusResults"] = [ + ServerStatusResult.generate_from(status) + for status in raw_response["statusResults"] + ] + response["client"] = IPCClientResponse.SUCCESS + response["server"] = raw_response + except KeyError as enum_decode_error: + print("Error decoding enum", enum_decode_error, response) + response["client"] = IPCClientResponse.GENERAL_ERROR + except socket.timeout: + response["client"] = IPCClientResponse.TIMED_OUT + except Exception as fallback_error: + response["client"] = IPCClientResponse.GENERAL_ERROR + print( + fallback_error, + response, + ) finally: sock.close() - return response.decode() - def send_ipc_command(command: IPC_COMMAND): + checked_result = handle_ipc_result(response["client"], response["server"]) + return checked_result + + def send_ipc_command( + command: IPC_COMMAND, + ) -> Optional[any]: """Sends a single command to the screenreader""" - return actions.user.send_ipc_commands([command]) + result: list[Tuple] = actions.user.send_ipc_commands([command]) + if len(result) != 1: + raise ValueError( + f"Expected 1 command to be sent, but got {len(result)} instead" + ) + _, value = result[FIRST_AND_ONLY_COMMAND := 0] + return value ORCAContext = Context() diff --git a/core/screenreader_ipc/ipc_schema.py b/core/screenreader_ipc/ipc_schema.py index 44f3b2e..acad2f2 100644 --- a/core/screenreader_ipc/ipc_schema.py +++ b/core/screenreader_ipc/ipc_schema.py @@ -1,12 +1,56 @@ -from typing import Literal +import enum +from typing import List, Literal, Any, TypedDict, Optional # Exhaustive list of commands that can be sent to the NVDA addon server + IPC_COMMAND = Literal[ "disableSpeechInterruptForCharacters", "enableSpeechInterruptForCharacters", + "getSpeechInterruptForCharacters", "disableSpeakTypedWords", "enableSpeakTypedWords", + "getSpeakTypedWords", "disableSpeakTypedCharacters", "enableSpeakTypedCharacters", + "getSpeakTypedCharacters", "debug", ] + + +class ServerSpec(TypedDict): + address: str + port: str + valid_commands: List[IPC_COMMAND] + + +class ServerStatusResult(enum.Enum): + SUCCESS = "success" + INTERNAL_SERVER_ERROR = "serverError" + INVALID_COMMAND_ERROR = "commandError" + RUNTIME_ERROR = "runtimeError" + JSON_ENCODE_ERROR = "jsonEncodeError" + + @staticmethod + def generate_from(value: str): + for member in ServerStatusResult: + if member.value == value: + return member + raise KeyError(f"Invalid status result: {value}") + + +class IPCServerResponse(TypedDict): + processedCommands: List[str] + returnedValues: List[Any] + statusResults: List[ServerStatusResult] + + +class IPCClientResponse(enum.Enum): + NO_RESPONSE = "noResponse" + TIMED_OUT = "timedOut" + GENERAL_ERROR = "generalError" + SUCCESS = "success" + + +class ResponseBundle(TypedDict): + client: IPCClientResponse + server: Optional[None | IPCServerResponse] diff --git a/nvda/.addOn/sight-free-talon-server/addon/globalPlugins/nvda-addon.py b/nvda/.addOn/sight-free-talon-server/addon/globalPlugins/nvda-addon.py index e0074d4..9ee05e5 100644 --- a/nvda/.addOn/sight-free-talon-server/addon/globalPlugins/nvda-addon.py +++ b/nvda/.addOn/sight-free-talon-server/addon/globalPlugins/nvda-addon.py @@ -6,22 +6,46 @@ import socket import threading import globalVars +import enum +import time # Exhaustive list of valid commands -schema = [ +valid_commands = [ "disableSpeechInterruptForCharacters", "enableSpeechInterruptForCharacters", + "getSpeechInterruptForCharacters", "disableSpeakTypedWords", "enableSpeakTypedWords", + "getSpeakTypedWords", "disableSpeakTypedCharacters", "enableSpeakTypedCharacters", + "getSpeakTypedCharacters", "debug", ] + +class ResponseSchema: + def __init__(self): + self.processedCommands = [] + self.returnedValues = [] + self.statusResults = [] + + def generate(): + return {"processedCommands": [], "returnedValues": [], "statusResults": []} + + # Handles both portable and installed versions of NVDA SPEC_PATH = os.path.join(globalVars.appArgs.configPath, "talon_server_spec.json") +class StatusResult(enum.Enum): + SUCCESS = "success" + INTERNAL_SERVER_ERROR = "serverError" + INVALID_COMMAND_ERROR = "commandError" + RUNTIME_ERROR = "runtimeError" + JSON_ENCODE_ERROR = "jsonEncodeError" + + # Bind to an open port in case the specified port is not available def bind_to_available_port(server_socket, start_port, end_port): for port in range(start_port, end_port): @@ -33,33 +57,58 @@ def bind_to_available_port(server_socket, start_port, end_port): raise OSError(f"No available ports in the range {start_port}-{end_port}") -# Change a setting based on a message from the client +# Process a command, return the command and result as well as the retrieved value, if applicable +# Yes this is a big if/else block, but it's the most efficient way to handle the commands def handle_command(command: str): - if command not in schema: - return f"Invalid command: '{command}', type='{type(command)}'" + command, value, result = command, None, None + + if command == "getSpeechInterruptForCharacters": + value, result = ( + config.conf["keyboard"]["speechInterruptForCharacters"], + StatusResult.SUCCESS, + ) - if command == "disableSpeechInterruptForCharacters": + elif command == "getSpeakTypedWords": + value, result = config.conf["keyboard"]["speakTypedWords"], StatusResult.SUCCESS + + elif command == "getSpeakTypedCharacters": + value, result = ( + config.conf["keyboard"]["speakTypedCharacters"], + StatusResult.SUCCESS, + ) + + elif command == "disableSpeechInterruptForCharacters": config.conf["keyboard"]["speechInterruptForCharacters"] = False + value, result = None, StatusResult.SUCCESS elif command == "enableSpeechInterruptForCharacters": config.conf["keyboard"]["speechInterruptForCharacters"] = True + value, result = None, StatusResult.SUCCESS elif command == "disableSpeakTypedWords": config.conf["keyboard"]["speakTypedWords"] = False + value, result = None, StatusResult.SUCCESS elif command == "enableSpeakTypedWords": config.conf["keyboard"]["speakTypedWords"] = True + value, result = None, StatusResult.SUCCESS elif command == "disableSpeakTypedCharacters": config.conf["keyboard"]["speakTypedCharacters"] = False + value, result = None, StatusResult.SUCCESS elif command == "enableSpeakTypedCharacters": config.conf["keyboard"]["speakTypedCharacters"] = True + value, result = None, StatusResult.SUCCESS elif command == "debug": tones.beep(640, 100) + value, result = None, StatusResult.SUCCESS - return f"Success running: {command}" + else: + value, result = None, StatusResult.INVALID_COMMAND_ERROR + + return command, value, result class IPC_Server: @@ -71,27 +120,31 @@ class IPC_Server: def handle_client(self, client_socket: socket.socket): data = client_socket.recv(1024) + response = ResponseSchema.generate() + try: messages = json.loads(data.decode().strip()) + + for message in messages: + command, value, result = handle_command(message) + response["processedCommands"].append(command) + response["returnedValues"].append(value) + # We can't pickle the StatusResult enum, so we have to convert it to a string + response["statusResults"].append(result.value) + except json.JSONDecodeError as e: print(f"RECEIVED INVALID JSON FROM TALON: {e}") - response = f"TALON SERVER ERROR: {e}".encode() - client_socket.sendall(response) - return - - result = "" - for message in messages: - result += handle_command(message) + response["statusResults"] = [StatusResult.JSON_ENCODE_ERROR.value] - response = f"TALON COMMAND PROCESSED: {result}".encode() - client_socket.sendall(response) + finally: + client_socket.sendall(json.dumps(response).encode("utf-8")) def output_spec_file(self): # write a json file to let clients know how to connect and what commands are available spec = { "address": "localhost", "port": str(self.get_port()), - "valid_commands": schema, + "valid_commands": valid_commands, } with open(SPEC_PATH, "w") as f: json.dump(spec, f) @@ -131,9 +184,10 @@ def create_server(self): os.path.join( globalVars.appArgs.configPath, "talon_server_error.log" ), - "w", + "a", ) as f: - f.write(str(e)) + # get the time and append it to the error log + f.write(f"ERROR AT {time.time()}: {e}") break finally: if self.client_socket: diff --git a/nvda/debug-nvda.talon b/nvda/debug-nvda.talon index 476fc65..d2c4c86 100644 --- a/nvda/debug-nvda.talon +++ b/nvda/debug-nvda.talon @@ -5,5 +5,8 @@ os: windows test reader add on: user.test_reader_addon() +# key(ctrl-shift-alt-g): +# user.test_reader_addon() + test controller client: user.test_controller_client() diff --git a/nvda/nvda.py b/nvda/nvda.py index 3fbc6ee..4098e85 100644 --- a/nvda/nvda.py +++ b/nvda/nvda.py @@ -40,8 +40,9 @@ def toggle_nvda(): elif actions.user.is_nvda_running(): actions.user.with_nvda_mod_press("q") actions.user.tts("Turning NVDA off") - time.sleep(1) - actions.key("enter") + # We need this delay so the post-phrase callback + # can be set even if nvda is turned off + cron.after("2s", lambda: actions.key("enter")) def restart_nvda(): """Restarts NVDA""" @@ -49,8 +50,9 @@ def restart_nvda(): actions.user.with_nvda_mod_press("q") time.sleep(0.5) actions.key("down") - time.sleep(0.5) - actions.key("enter") + # We need this delay so the post-phrase callback + # can be set even if nvda is turned off + cron.after("2s", lambda: actions.key("enter")) actions.user.tts("Restarting NVDA") def with_nvda_mod_press(key: str): @@ -88,8 +90,8 @@ def test_controller_client(): def test_reader_addon(): """Tests the reader addon""" - result = actions.user.send_ipc_command("debug") - actions.user.tts(f"Reader addon result: {result}") + actions.user.send_ipc_command("debug") + actions.user.tts("Success testing reader addon") ctxWindowsNVDARunning = Context() @@ -146,6 +148,8 @@ def switch_voice(): # Only send post:phrase callback if we sent a pre:phrase callback successfully pre_phrase_sent = False +reenable_commands = [] + # By default the screen reader will allow you to press a key and interrupt the ph # rase however this does not work alongside typing given the fact that we are pres @@ -161,13 +165,32 @@ def disable_interrupt(_): ): return - # bundle the commands into a single messge + # Commands are processed in order + # We first see the value for each setting, then we disable them + # We only need to enable them at the end of the phrase if + # there were enabled before commands = [ + "getSpeechInterruptForCharacters", + "getSpeakTypedWords", + "getSpeakTypedCharacters", "disableSpeechInterruptForCharacters", "disableSpeakTypedWords", "disableSpeakTypedCharacters", ] - actions.user.send_ipc_commands(commands) + results = actions.user.send_ipc_commands(commands) + global reenable_commands + reenable_commands = [] + for command, value in results: + match command: + case ( + ( + "getSpeechInterruptForCharacters" + | "getSpeakTypedWords" + | "getSpeakTypedCharacters" + ) as cmd + ) if value: + reenable_commands.append(cmd.replace("get", "enable")) + pre_phrase_sent = True @@ -184,12 +207,14 @@ def enable_interrupt(_): ): return - # bundle the commands into a single message - commands = [ - "enableSpeechInterruptForCharacters", - "enableSpeakTypedWords", - "enableSpeakTypedCharacters", - ] + global reenable_commands + # Can add more commands here if needed + # We don't need to send disable commands since they are already disabled + # at pre-phrase time + commands = reenable_commands + + if len(commands) == 0: + return # this is kind of a hack since we don't know exactly when to re enable it # because we don't have a callback at the end of the last keypress diff --git a/utils/access-focus.py b/utils/access-focus.py index e327e0f..b93f7ea 100644 --- a/utils/access-focus.py +++ b/utils/access-focus.py @@ -69,7 +69,7 @@ def focus_element_by_name(name: str, permissive: bool = True): try: # https://learn.microsoft.com/en-us/windows/win32/winauto/selflag # SELFLAG_TAKESELECTION = 2 - # Ideally we would use .select() but the API doesn't work + # We can also use .select() element.legacyiaccessible_pattern.do_default_action() except Exception as f: actions.user.tts(f"Failed to focus {name}")