Skip to content

Commit

Permalink
Merge pull request #28 from C-Loftus/restoreSettingInExtension
Browse files Browse the repository at this point in the history
IPC Refactor to support reenabling only the desired settings
  • Loading branch information
C-Loftus authored Mar 11, 2024
2 parents 0071d15 + 5f6d9c8 commit 028dea8
Show file tree
Hide file tree
Showing 8 changed files with 310 additions and 69 deletions.
3 changes: 1 addition & 2 deletions core/core-linux.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

@ctxLinux.action_class("user")
class UserActions:

def toggle_reader():
"""Toggles the screen reader on and off"""
actions.user.toggle_orca()
Expand Down Expand Up @@ -44,7 +43,7 @@ def tts(text: str, interrupt: bool = True):

def espeak(text: str):
"""Text to speech with a robotic/narrator voice"""
rate = settings.get("user.tts_speed", 0)
rate = settings.get("user.tts_speed")
# convert -10 to 10 to -100 to 100
rate = rate * 10
# text = remove_special(text)
Expand Down
2 changes: 1 addition & 1 deletion core/core-windows.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ def base_win_tts(text: str, interrupt: bool):
"""Base function for windows tts. We expose this
so we can share the speaker object across files. We don't want
it to get overridden by the other tts functions"""
speaker.set_rate(settings.get("user.tts_speed", 0))
speaker.set_rate(settings.get("user.tts_speed"))
speaker.set_volume(settings.get("user.tts_volume", 50))
speaker.speak(text, interrupt)

Expand Down
180 changes: 148 additions & 32 deletions core/screenreader_ipc/ipc_client.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,104 @@
from talon import Module, Context, actions, settings
from talon import Module, Context, actions, settings, cron
import os
import ipaddress
import json
import socket
import threading
from typing import Tuple
from .ipc_schema import IPC_COMMAND
from typing import Tuple, assert_never, Optional
from .ipc_schema import (
IPC_COMMAND,
IPCServerResponse,
IPCClientResponse,
ServerSpec,
ServerStatusResult,
ResponseBundle,
)


mod = Module()
lock = threading.Lock()


def handle_ipc_result(
client_response: IPCClientResponse,
server_response: IPCServerResponse,
) -> list[Tuple[IPC_COMMAND, Optional[any]]]:
"""
Sanitize the response from the screenreader server
and return just the commands and their return values
if present
"""
if settings.get("user.addon_debug"):
print(f"Received responses\n{client_response=}\n{server_response=}")

match client_response, server_response:
case (
(
IPCClientResponse.NO_RESPONSE
| IPCClientResponse.TIMED_OUT
| IPCClientResponse.GENERAL_ERROR,
_,
) as error
):
raise RuntimeError(
f"Clientside {error=} communicating with screenreader extension"
)
case (IPCClientResponse.SUCCESS, _):
# empty case is here for exhaustiveness
pass

for cmd, value, status in zip(
server_response["processedCommands"],
server_response["returnedValues"],
server_response["statusResults"],
):
match status:
case ServerStatusResult.SUCCESS:
# empty case is here for exhaustiveness
pass
case ServerStatusResult.INVALID_COMMAND_ERROR:
raise ValueError(f"Invalid command '{cmd}' sent to screenreader")
case ServerStatusResult.JSON_ENCODE_ERROR:
raise ValueError(
"Invalid JSON payload sent from client to screenreader"
)
case (
(
ServerStatusResult.INTERNAL_SERVER_ERROR
| ServerStatusResult.RUNTIME_ERROR
) as error
):
raise RuntimeError(f"{error} processing command '{cmd}'")
case _:
assert_never((cmd, value, status))

return list(
zip(server_response["processedCommands"], server_response["returnedValues"])
)


@mod.action_class
class Actions:
def addon_server_endpoint() -> Tuple[str, str, str]:
"""Returns the address, port, and valid commands for the addon server"""

def send_ipc_commands(commands: list[IPC_COMMAND]):
"""Sends a command or commands to the screenreader"""
# We use a list and not a dict since we can have duplicate commands in the same payload
def send_ipc_commands(
commands: list[IPC_COMMAND],
) -> list[Tuple[IPC_COMMAND, Optional[any]]]:
"""Sends a bundle of commands to the screenreader"""
actions.user.tts("No screenreader running to send commands to")
raise NotImplementedError

def send_ipc_command(command: IPC_COMMAND):
# We need a separate command for single commands since we can't easily
# pass in a list via a .talon file and thus this allows a single string instead
def send_ipc_command(
command: IPC_COMMAND,
) -> Optional[any]:
"""
Sends a single command to the screenreader.
This is its own function since old versions of talon
don't support union type hints and having a separate
function is a workaround a clearer API than passing in a list
for a single command
This is its own function since it is a clearer API than passing in
a list for a single command
"""
actions.user.tts("No screenreader running to send commands to")
raise NotImplementedError
Expand All @@ -48,7 +119,7 @@ def addon_server_endpoint() -> Tuple[str, str, str]:
)

with open(SPEC_FILE, "r") as f:
spec = json.load(f)
spec: ServerSpec = json.load(f)
address = spec["address"]
port = spec["port"]
valid_commands = spec["valid_commands"]
Expand All @@ -64,23 +135,49 @@ def addon_server_endpoint() -> Tuple[str, str, str]:

return address, port, valid_commands

def send_ipc_commands(commands: list[IPC_COMMAND]):
# Should be used only for single commands or debugging
def send_ipc_commands(
commands: list[IPC_COMMAND],
) -> list[Tuple[IPC_COMMAND, Optional[any]]]:
"""Sends a list of commands or a single command string to the NVDA screenreader"""
ip, port, valid_commands = actions.user.addon_server_endpoint()

# this function can still be called if NVDA is running, since cron
# is ran 400ms after the check, so we can check again here after the
# scheduler runs the function
if not actions.user.is_nvda_running():
return

try:
ip, port, valid_commands = actions.user.addon_server_endpoint()
except FileNotFoundError:
# If we just shut the server down via a talon command
# we need to check again after a delay once the controller
# client is updated
def check_if_shutdown():
if not actions.user.is_nvda_running():
# Ignore, we expect the server to be shut down
# and the file to be gone
return
else:
raise FileNotFoundError(
"NVDA is running but the server spec file is missing"
)

cron.after("2s", check_if_shutdown)

for command in commands:
if command not in valid_commands:
raise ValueError(f"Invalid command: {command}")
raise ValueError(f"Server cannot process command: {command}")

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.2)
encoded = json.dumps(commands).encode()

if settings.get("user.addon_debug"):
print(f"Sending {commands} to {ip}:{port}")

# Default response if nothing is set
response = b"No response from NVDA addon server"
response: ResponseBundle = {
"client": IPCClientResponse.NO_RESPONSE,
"server": None,
}

# Although the screenreader server will block while processing commands,
# having a lock client-side prevents errors when sending multiple commands
Expand All @@ -91,24 +188,43 @@ def send_ipc_commands(commands: list[IPC_COMMAND]):
# Block until we receive a response
# We don't want to execute commands until
# we know the screen reader has the proper settings
response = sock.recv(1024)
if settings.get("user.addon_debug"):
print("Received", repr(response))

except socket.timeout as e:
print(f"Clientside connection with {ip}:{port} timed out")
print(e)
response = b"Clientside connection with NVDA timed out"
except Exception as e:
print(f"Error Communicating with NVDA extension: {e}")
response = b"Error communicating with NVDA extension"
raw_data = sock.recv(1024)

raw_response: IPCServerResponse = json.loads(raw_data.decode("utf-8"))
raw_response["statusResults"] = [
ServerStatusResult.generate_from(status)
for status in raw_response["statusResults"]
]
response["client"] = IPCClientResponse.SUCCESS
response["server"] = raw_response
except KeyError as enum_decode_error:
print("Error decoding enum", enum_decode_error, response)
response["client"] = IPCClientResponse.GENERAL_ERROR
except socket.timeout:
response["client"] = IPCClientResponse.TIMED_OUT
except Exception as fallback_error:
response["client"] = IPCClientResponse.GENERAL_ERROR
print(
fallback_error,
response,
)
finally:
sock.close()
return response.decode()

def send_ipc_command(command: IPC_COMMAND):
checked_result = handle_ipc_result(response["client"], response["server"])
return checked_result

def send_ipc_command(
command: IPC_COMMAND,
) -> Optional[any]:
"""Sends a single command to the screenreader"""
return actions.user.send_ipc_commands([command])
result: list[Tuple] = actions.user.send_ipc_commands([command])
if len(result) != 1:
raise ValueError(
f"Expected 1 command to be sent, but got {len(result)} instead"
)
_, value = result[FIRST_AND_ONLY_COMMAND := 0]
return value


ORCAContext = Context()
Expand Down
46 changes: 45 additions & 1 deletion core/screenreader_ipc/ipc_schema.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,56 @@
from typing import Literal
import enum
from typing import List, Literal, Any, TypedDict, Optional

# Exhaustive list of commands that can be sent to the NVDA addon server

IPC_COMMAND = Literal[
"disableSpeechInterruptForCharacters",
"enableSpeechInterruptForCharacters",
"getSpeechInterruptForCharacters",
"disableSpeakTypedWords",
"enableSpeakTypedWords",
"getSpeakTypedWords",
"disableSpeakTypedCharacters",
"enableSpeakTypedCharacters",
"getSpeakTypedCharacters",
"debug",
]


class ServerSpec(TypedDict):
address: str
port: str
valid_commands: List[IPC_COMMAND]


class ServerStatusResult(enum.Enum):
SUCCESS = "success"
INTERNAL_SERVER_ERROR = "serverError"
INVALID_COMMAND_ERROR = "commandError"
RUNTIME_ERROR = "runtimeError"
JSON_ENCODE_ERROR = "jsonEncodeError"

@staticmethod
def generate_from(value: str):
for member in ServerStatusResult:
if member.value == value:
return member
raise KeyError(f"Invalid status result: {value}")


class IPCServerResponse(TypedDict):
processedCommands: List[str]
returnedValues: List[Any]
statusResults: List[ServerStatusResult]


class IPCClientResponse(enum.Enum):
NO_RESPONSE = "noResponse"
TIMED_OUT = "timedOut"
GENERAL_ERROR = "generalError"
SUCCESS = "success"


class ResponseBundle(TypedDict):
client: IPCClientResponse
server: Optional[None | IPCServerResponse]
Loading

0 comments on commit 028dea8

Please sign in to comment.