From ae81c76f6318a26c0fbb5c284d0111817bf1e817 Mon Sep 17 00:00:00 2001 From: ms264556 <29752086+ms264556@users.noreply.github.com> Date: Wed, 14 Aug 2024 14:59:45 +1200 Subject: [PATCH] Add minimal Ruckus One methods for Home Assistant --- aioruckus/ajaxsession.py | 68 +++++++++++ aioruckus/r1ajaxapi.py | 251 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 319 insertions(+) create mode 100644 aioruckus/r1ajaxapi.py diff --git a/aioruckus/ajaxsession.py b/aioruckus/ajaxsession.py index ed82735..5b43438 100644 --- a/aioruckus/ajaxsession.py +++ b/aioruckus/ajaxsession.py @@ -52,6 +52,10 @@ def __init__( # SmartZone State self.__service_ticket = None + # Ruckus One State + self.__tenant_id = None + self.__bearer_token = None + # API Implementation async def __aenter__(self) -> "AjaxSession": @@ -65,6 +69,10 @@ async def login(self) -> None: """Create HTTPS AJAX session.""" # locate the admin pages: /admin/* for Unleashed and ZD 9.x, /admin10/* for ZD 10.x try: + if self.host.lower().startswith("https://"): + parsed_url = urlparse(self.host) + if (parsed_url.netloc == "ruckus.cloud" or parsed_url.netloc.endswith(".ruckus.cloud")): + return await self.r1_login() async with self.websession.head( f"https://{self.host}", timeout=3, allow_redirects=False ) as head: @@ -136,6 +144,40 @@ async def login(self) -> None: self._api = RuckusAjaxApi(self) return self + async def r1_login(self) -> None: + """Create Ruckus One session.""" + try: + parsed_url = urlparse(self.host) + self.base_url = f"{parsed_url.scheme}://{parsed_url.netloc if parsed_url.netloc.startswith('api.') else 'api.' + parsed_url.netloc}" + self.__tenant_id = parsed_url.path[1:33] + + async with self.websession.post( + f"{self.base_url}/oauth2/token/{self.__tenant_id}", + headers={"Content-Type": "application/x-www-form-urlencoded"}, + data={"grant_type": "client_credentials", "client_id": self.username, "client_secret": self.password}, + timeout=20, + allow_redirects=False + ) as oauth2: + if oauth2.status != 200: + raise AuthenticationError(ERROR_LOGIN_INCORRECT) + oauth_info = await oauth2.json() + self.__bearer_token = f"Bearer {oauth_info['access_token']}" + # pylint: disable=import-outside-toplevel + from .r1ajaxapi import R1AjaxApi + self._api = R1AjaxApi(self) + return self + except KeyError as kerr: + raise ConnectionError(ERROR_CONNECT_EOF) from kerr + except IndexError as ierr: + raise ConnectionError(ERROR_CONNECT_EOF) from ierr + except aiohttp.ContentTypeError as cterr: + raise ConnectionError(ERROR_CONNECT_EOF) from cterr + except aiohttp.client_exceptions.ClientConnectorError as cerr: + raise ConnectionError(ERROR_CONNECT_EOF) from cerr + except asyncio.exceptions.TimeoutError as terr: + raise ConnectionError(ERROR_CONNECT_TIMEOUT) from terr + + async def sz_login(self) -> None: """Create SmartZone session.""" try: @@ -280,6 +322,32 @@ async def sz_query( ) -> dict: return (await self.sz_post(f"query/{cmd}", query))["list"] + async def r1_get( + self, + cmd: str, + params: dict = None, + timeout: int | None = None, + retrying: bool = False + ) -> dict: + """Get R1 Data""" + async with self.websession.get( + f"{self.base_url}/{cmd}", + headers={"Authorization": self.__bearer_token}, + params=params, + timeout=timeout, + allow_redirects=False + ) as response: + if response.status != 200: + # assume session is dead and re-login + if retrying: + # we tried logging in again, but the redirect still happens. + # an exception should have been raised from the login! + raise AuthenticationError(ERROR_POST_REDIRECTED) + await self.r1_login() # try logging in again, then retry post + return await self.r1_get(cmd, params, timeout, retrying=True) + result_json = await response.json() + return result_json + async def sz_get( self, cmd: str, diff --git a/aioruckus/r1ajaxapi.py b/aioruckus/r1ajaxapi.py new file mode 100644 index 0000000..91d4737 --- /dev/null +++ b/aioruckus/r1ajaxapi.py @@ -0,0 +1,251 @@ +"""Adds enough AJAX methods to RuckusApi to support Home Assistant""" + +from typing import List + +from .ruckusajaxapi import RuckusAjaxApi +from .typing_policy import * + +from .const import ( + SystemStat, +) +from .ajaxsession import AjaxSession + + +class R1AjaxApi(RuckusAjaxApi): + """Ruckus One Configuration, Statistics and Commands API""" + + def __init__(self, session: AjaxSession): + super().__init__(session) + + async def get_aps(self) -> List[dict]: + """Return a list of APs""" + aps = await self.session.r1_get("venues/aps") + compat_aps = [ + { + **ap, + "devname": ap["name"], + "version": ap["firmware"], + "serial": ap["serialNumber"], + } + for ap in aps + ] + return compat_aps + + async def get_ap_groups(self) -> List: + """Return a list of AP groups""" + raise NotImplementedError + + async def get_wlans(self) -> List[dict]: + """Return a list of WLANs""" + raise NotImplementedError + + async def get_wlan_groups(self) -> List[dict]: + """Return a list of WLAN groups""" + raise NotImplementedError + + async def get_urlfiltering_policies(self) -> list[UrlFilter | dict]: + """Return a list of URL Filtering Policies""" + raise NotImplementedError + + async def get_urlfiltering_blockingcategories( + self, + ) -> list[UrlBlockCategory | dict]: + """Return a list of URL Filtering Blocking Categories""" + raise NotImplementedError + + async def get_ip4_policies(self) -> list[Ip4Policy | dict]: + """Return a list of IP4 Policies""" + raise NotImplementedError + + async def get_ip6_policies(self) -> list[Ip6Policy | dict]: + """Return a list of IP6 Policies""" + raise NotImplementedError + + async def get_device_policies(self) -> list[DevicePolicy | dict]: + """Return a list of Device Policies""" + raise NotImplementedError + + async def get_precedence_policies(self) -> list[PrecedencePolicy | dict]: + """Return a list of Precedence Policies""" + raise NotImplementedError + + async def get_arc_policies(self) -> list[ArcPolicy | dict]: + """Return a list of Application Recognition & Control Policies""" + raise NotImplementedError + + async def get_arc_applications(self) -> list[ArcApplication | dict]: + """Return a list of Application Recognition & Control User Defined Applications""" + raise NotImplementedError + + async def get_arc_ports(self) -> list[ArcPort | dict]: + """Return a list of Application Recognition & Control User Defined Ports""" + raise NotImplementedError + + async def get_roles(self) -> list[Role | dict]: + """Return a list of Roles""" + raise NotImplementedError + + async def get_dpsks(self) -> list[Dpsk | dict]: + """Return a list of DPSKs""" + raise NotImplementedError + + async def get_system_info(self, *sections: SystemStat) -> dict: + """Return system information""" + tenant = await self.session.r1_get("tenants/self") + return { + "tenant": tenant, + "sysinfo": {"version": "R1", "serial": tenant["entitlementId"]}, + "identity": {"name": tenant["name"]} + } + + async def get_mesh_info(self) -> dict: + """Return dummy mesh information""" + # We need to implement this because Home Assistant uses the mesh + # name as the display name for any Ruskus network. + # We will use the Tenant Name instead. + tenant = await self.session.r1_get("tenants/self") + return tenant + + async def __get_cluster_state(self) -> dict: + """Return Cluster State""" + return await self.session.sz_get("cluster/state") + + async def get_zerotouch_mesh_ap_serials(self) -> dict: + """Return a list of Pre-approved AP serial numbers""" + raise NotImplementedError + + async def get_acls(self) -> list[L2Policy | dict]: + """Return a list of ACLs""" + raise NotImplementedError + + async def get_blocked_client_macs(self) -> list[L2Rule | dict]: + """Return a list of blocked client MACs""" + raise NotImplementedError + + async def get_active_clients(self, interval_stats: bool = False) -> List: + """Return a list of active clients""" + clients = await self.session.r1_get("clients") + compat_clients = [ + { + **client, + "ap": client["apMac"], + "hostname": client["hostname"] or client["mac"] + } + for client in clients + ] + return compat_clients + + async def get_inactive_clients(self) -> List: + """Return a list of inactive clients""" + raise NotImplementedError + + async def get_ap_stats(self) -> List: + """Return a list of AP statistics""" + aps = await self.session.r1_get("venues/aps") + compat_aps = [ + { + **ap, + "devname": ap["name"], + "firmware-version": ap["firmware"], + "serial-number": ap["serialNumber"], + } + for ap in aps + ] + return compat_aps + + async def get_ap_group_stats(self) -> List: + """Return a list of AP group statistics""" + raise NotImplementedError + + async def get_vap_stats(self) -> List: + """Return a list of Virtual AP (per-radio WLAN) statistics""" + return await self.session.sz_query("wlan") + + async def get_wlan_group_stats(self) -> List: + """Return a list of WLAN group statistics""" + raise NotImplementedError + + async def get_dpsk_stats(self) -> List: + """Return a list of AP group statistics""" + raise NotImplementedError + + async def get_active_rogues(self) -> list[dict]: + """Return a list of currently active rogue devices""" + raise NotImplementedError + + async def get_known_rogues(self, limit: int = 300) -> list[dict]: + """Return a list of known/recognized rogues devices""" + raise NotImplementedError + + async def get_blocked_rogues(self, limit: int = 300) -> list[dict]: + """Return a list of user blocked rogues devices""" + raise NotImplementedError + + async def get_all_alarms(self, limit: int = 300) -> list[dict]: + """Return a list of all alerts""" + raise NotImplementedError + + async def get_all_events(self, limit: int = 300) -> list[dict]: + """Return a list of all events""" + raise NotImplementedError + + async def get_wlan_events(self, *wlan_ids, limit: int = 300) -> list[dict]: + """Return a list of WLAN events""" + raise NotImplementedError + + async def get_ap_events(self, *ap_macs, limit: int = 300) -> list[dict]: + """Return a list of AP events""" + raise NotImplementedError + + async def get_client_events(self, limit: int = 300) -> list[dict]: + """Return a list of client events""" + raise NotImplementedError + + async def get_wired_client_events(self, limit: int = 300) -> list[dict]: + """Return a list of wired client events""" + raise NotImplementedError + + async def get_syslog(self) -> str: + """Return a list of syslog entries""" + raise NotImplementedError + + async def get_backup(self) -> bytes: + """Return a backup""" + raise NotImplementedError + + async def do_block_client(self, mac: str) -> None: + """Block a client""" + raise NotImplementedError + + async def do_unblock_client(self, mac: str) -> None: + """Unblock a client""" + raise NotImplementedError + + async def do_delete_ap_group(self, name: str) -> bool: + """Delete an AP group""" + raise NotImplementedError + + async def do_disable_wlan(self, name: str, disable_wlan: bool = True) -> None: + """Disable a WLAN""" + raise NotImplementedError + + async def do_enable_wlan(self, name: str) -> None: + """Enable a WLAN""" + raise NotImplementedError + + async def do_set_wlan_password( + self, name: str, passphrase: str, sae_passphrase: str = None + ) -> None: + raise NotImplementedError + + async def do_hide_ap_leds(self, mac: str, leds_off: bool = True) -> None: + """Hide AP LEDs""" + raise NotImplementedError + + async def do_show_ap_leds(self, mac: str) -> None: + """Show AP LEDs""" + raise NotImplementedError + + async def do_restart_ap(self, mac: str) -> None: + """Restart AP""" + raise NotImplementedError