diff --git a/plexapi/__init__.py b/plexapi/__init__.py index 1d4fb471e..12bfa9b1a 100644 --- a/plexapi/__init__.py +++ b/plexapi/__init__.py @@ -3,6 +3,7 @@ import os from logging.handlers import RotatingFileHandler from platform import uname +from typing import cast from uuid import getnode from plexapi.config import PlexConfig, reset_base_headers @@ -18,7 +19,7 @@ PROJECT = 'PlexAPI' VERSION = __version__ = const.__version__ TIMEOUT = CONFIG.get('plexapi.timeout', 30, int) -X_PLEX_CONTAINER_SIZE = CONFIG.get('plexapi.container_size', 100, int) +X_PLEX_CONTAINER_SIZE = cast(int, CONFIG.get('plexapi.container_size', 100, int)) X_PLEX_ENABLE_FAST_CONNECT = CONFIG.get('plexapi.enable_fast_connect', False, bool) # Plex Header Configuration diff --git a/plexapi/audio.py b/plexapi/audio.py index 10ba97689..309ff92eb 100644 --- a/plexapi/audio.py +++ b/plexapi/audio.py @@ -1,12 +1,15 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import os from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import quote_plus from typing import Any, Dict, List, Optional, TypeVar from plexapi import media, utils -from plexapi.base import Playable, PlexPartialObject, PlexHistory, PlexSession +from plexapi.base import Playable, PlexHistory, PlexPartialObject, PlexSession from plexapi.exceptions import BadRequest from plexapi.mixins import ( AdvancedSettingsMixin, SplitMergeMixin, UnmatchMatchMixin, ExtrasMixin, HubsMixin, PlayedUnplayedMixin, RatingMixin, @@ -15,6 +18,9 @@ ) from plexapi.playlist import Playlist +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + TAudio = TypeVar("TAudio", bound="Audio") @@ -53,7 +59,7 @@ class Audio(PlexPartialObject, PlayedUnplayedMixin): """ METADATA_TYPE = 'track' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.addedAt = utils.toDatetime(data.attrib.get('addedAt')) @@ -192,7 +198,7 @@ class Artist( TAG = 'Directory' TYPE = 'artist' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Audio._loadData(self, data) self.albumSort = utils.cast(int, data.attrib.get('albumSort', '-1')) @@ -323,7 +329,7 @@ class Album( TAG = 'Directory' TYPE = 'album' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Audio._loadData(self, data) self.collections = self.findItems(data, media.Collection) @@ -455,7 +461,7 @@ class Track( TAG = 'Track' TYPE = 'track' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Audio._loadData(self, data) Playable._loadData(self, data) @@ -535,7 +541,7 @@ class TrackSession(PlexSession, Track): """ _SESSIONTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Track._loadData(self, data) PlexSession._loadData(self, data) @@ -548,7 +554,7 @@ class TrackHistory(PlexHistory, Track): """ _HISTORYTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Track._loadData(self, data) PlexHistory._loadData(self, data) diff --git a/plexapi/base.py b/plexapi/base.py index 5373d3a96..fe6a50958 100644 --- a/plexapi/base.py +++ b/plexapi/base.py @@ -1,16 +1,44 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import re import weakref from functools import cached_property +from typing import ( + TYPE_CHECKING, + Any, + Callable, + Dict, + List, + Optional, + Set, + Type, + TypeVar, + Union, + cast, + overload, +) from urllib.parse import urlencode from xml.etree import ElementTree +from xml.etree.ElementTree import Element from plexapi import CONFIG, X_PLEX_CONTAINER_SIZE, log, utils from plexapi.exceptions import BadRequest, NotFound, UnknownType, Unsupported -USER_DONT_RELOAD_FOR_KEYS = set() +if TYPE_CHECKING: + from datetime import datetime + + from plexapi.library import LibrarySection + from plexapi.media import Session + from plexapi.server import PlexServer + +FunctionCheck = Callable[[Any, Any], bool] +PlexObjectT = TypeVar('PlexObjectT', bound='PlexObject') + +USER_DONT_RELOAD_FOR_KEYS: Set[str] = set() _DONT_RELOAD_FOR_KEYS = {'key'} -OPERATORS = { + +OPERATORS: Dict[str, FunctionCheck] = { 'exact': lambda v, q: v == q, 'iexact': lambda v, q: v.lower() == q.lower(), 'contains': lambda v, q: q in v, @@ -25,9 +53,9 @@ 'istartswith': lambda v, q: v.lower().startswith(q), 'endswith': lambda v, q: v.endswith(q), 'iendswith': lambda v, q: v.lower().endswith(q), - 'exists': lambda v, q: v is not None if q else v is None, - 'regex': lambda v, q: re.match(q, v), - 'iregex': lambda v, q: re.match(q, v, flags=re.IGNORECASE), + 'exists': lambda v, q: v is not None if q else v is None, # type: ignore # always True + 'regex': lambda v, q: bool(re.match(q, v)), + 'iregex': lambda v, q: bool(re.match(q, v, flags=re.IGNORECASE)), } @@ -40,23 +68,29 @@ class PlexObject: initpath (str): Relative path requested when retrieving specified `data` (optional). parent (:class:`~plexapi.base.PlexObject`): The parent object that this object is built from (optional). """ - TAG = None # xml element tag - TYPE = None # xml element type - key = None # plex relative url - - def __init__(self, server, data, initpath=None, parent=None): + TAG: Optional[str] = None # xml element tag + TYPE: Optional[str] = None # xml element type + key: Optional[str] = None # plex relative url + _INCLUDES: Dict[str, Any] = {} + + def __init__( + self, + server: PlexServer, + data: Element, + initpath: Optional[str] = None, + parent: Optional[PlexObject] = None, + ): self._server = server self._data = data self._initpath = initpath or self.key self._parent = weakref.ref(parent) if parent is not None else None - self._details_key = None # Allow overwriting previous attribute values with `None` when manually reloading self._overwriteNone = True # Automatically reload the object when accessing a missing attribute self._autoReload = CONFIG.get('plexapi.autoreload', True, bool) # Attribute to save batch edits for a single API call - self._edits = None + self._edits: Optional[Dict[str, Any]] = None if data is not None: self._loadData(data) @@ -73,7 +107,7 @@ def __setattr__(self, attr, value): if value is not None or attr.startswith('_') or attr not in self.__dict__ or overwriteNone: self.__dict__[attr] = value - def _clean(self, value): + def _clean(self, value: Optional[str] = None): """ Clean attr value for display in __repr__. """ if value: value = str(value).replace('/library/metadata/', '') @@ -82,18 +116,43 @@ def _clean(self, value): value = value.replace('/devices/', '') return value.replace(' ', '-')[:20] - def _buildItem(self, elem, cls=None, initpath=None): - """ Factory function to build objects based on registered PLEXOBJECTS. """ + @overload + def _buildItem( + self, + elem: Element, + cls: Type[PlexObjectT], + initpath: Optional[str], + ) -> PlexObjectT: + ... + + @overload + def _buildItem( + self, + elem: Element, + cls: None = None, + initpath: Optional[str] = None, + ) -> PlexObject: + ... + + def _buildItem( + self, + elem: Element, + cls: Optional[Type[PlexObjectT]] = None, + initpath: Optional[str] = None, + ) -> Union[PlexObjectT, PlexObject]: + """Factory function to build objects based on registered PLEXOBJECTS.""" # cls is specified, build the object and return initpath = initpath or self._initpath if cls is not None: return cls(self._server, elem, initpath, parent=self) # cls is not specified, try looking it up in PLEXOBJECTS - etype = elem.attrib.get('streamType', elem.attrib.get('tagType', elem.attrib.get('type'))) - ehash = f'{elem.tag}.{etype}' if etype else elem.tag - if initpath == '/status/sessions': + etype = elem.attrib.get( + "streamType", elem.attrib.get("tagType", elem.attrib.get("type")) + ) + ehash = f"{elem.tag}.{etype}" if etype else f"{elem.tag}" + if initpath == "/status/sessions": ehash = f"{ehash}.session" - elif initpath.startswith('/status/sessions/history'): + elif initpath and initpath.startswith("/status/sessions/history"): ehash = f"{ehash}.history" ecls = utils.PLEXOBJECTS.get(ehash, utils.PLEXOBJECTS.get(elem.tag)) # log.debug('Building %s as %s', elem.tag, ecls.__name__) @@ -101,7 +160,30 @@ def _buildItem(self, elem, cls=None, initpath=None): return ecls(self._server, elem, initpath) raise UnknownType(f"Unknown library type <{elem.tag} type='{etype}'../>") - def _buildItemOrNone(self, elem, cls=None, initpath=None): + @overload + def _buildItemOrNone( + self, + elem: Element, + cls: Type[PlexObjectT], + initpath: Optional[str] = None, + ) -> Optional[PlexObjectT]: + ... + + @overload + def _buildItemOrNone( + self, + elem: Element, + cls: None = None, + initpath: Optional[str] = None, + ) -> Optional[PlexObject]: + ... + + def _buildItemOrNone( + self, + elem: Element, + cls: Optional[Type[PlexObjectT]] = None, + initpath: Optional[str] = None, + ) -> Optional[Union[PlexObjectT, PlexObject]]: """ Calls :func:`~plexapi.base.PlexObject._buildItem` but returns None if elem is an unknown type. """ @@ -110,7 +192,7 @@ def _buildItemOrNone(self, elem, cls=None, initpath=None): except UnknownType: return None - def _buildDetailsKey(self, **kwargs): + def _buildDetailsKey(self, **kwargs: Any) -> Optional[str]: """ Builds the details key with the XML include parameters. All parameters are included by default with the option to override each parameter or disable each parameter individually by setting it to False or 0. @@ -126,7 +208,7 @@ def _buildDetailsKey(self, **kwargs): details_key += '?' + urlencode(sorted(includes.items())) return details_key - def _isChildOf(self, **kwargs): + def _isChildOf(self, **kwargs: Any): """ Returns True if this object is a child of the given attributes. This will search the parent objects all the way to the top. @@ -134,14 +216,30 @@ def _isChildOf(self, **kwargs): **kwargs (dict): The attributes and values to search for in the parent objects. See all possible `**kwargs*` in :func:`~plexapi.base.PlexObject.fetchItem`. """ - obj = self + obj = self # ? Why is this needed? while obj and obj._parent is not None: - obj = obj._parent() + obj = obj._parent() # type: ignore # TODO: Fix this type ignore if obj and obj._checkAttrs(obj._data, **kwargs): return True return False - def _manuallyLoadXML(self, xml, cls=None): + @overload + def _manuallyLoadXML( + self, + xml: str, + cls: Type[PlexObjectT], + ) -> PlexObjectT: + ... + + @overload + def _manuallyLoadXML( + self, + xml: str, + cls: None = None, + ) -> PlexObject: + ... + + def _manuallyLoadXML(self, xml: str, cls: Optional[Type[PlexObjectT]] = None) -> Optional[Union[PlexObjectT, PlexObject]]: """ Manually load an XML string as a :class:`~plexapi.base.PlexObject`. Parameters: @@ -154,7 +252,39 @@ def _manuallyLoadXML(self, xml, cls=None): elem = ElementTree.fromstring(xml) return self._buildItemOrNone(elem, cls) - def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, maxresults=None, **kwargs): + @overload + def fetchItems( + self, + ekey: Union[str, List[int]], + cls: None, + container_start: Optional[int] = None, + container_size: Optional[int] = None, + maxresults: Optional[int] = None, + **kwargs: Any, + ) -> List[PlexObject]: + ... + + @overload + def fetchItems( + self, + ekey: Union[str, List[int]], + cls: Type[PlexObjectT], + container_start: Optional[int] = None, + container_size: Optional[int] = None, + maxresults: Optional[int] = None, + **kwargs: Any, + ) -> List[PlexObjectT]: + ... + + def fetchItems( + self, + ekey: Union[str, List[int]], + cls: Optional[Type[PlexObjectT]] = None, + container_start: Optional[int] = None, + container_size: Optional[int] = None, + maxresults: Optional[int] = None, + **kwargs: Any, + ) -> Union[List[PlexObjectT], List[PlexObject]]: """ Load the specified key to find and build all items with the specified tag and attrs. @@ -231,12 +361,11 @@ def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, fetchItem(ekey, Media__Part__file__startswith="D:\\Movies") """ - if ekey is None: - raise BadRequest('ekey was not provided') - if isinstance(ekey, list) and all(isinstance(key, int) for key in ekey): + if isinstance(ekey, list) and all(isinstance(key, int) for key in ekey): # type: ignore # unnecessary isinstance ekey = f'/library/metadata/{",".join(str(key) for key in ekey)}' + ekey = cast(str, ekey) # TODO: Remove this cast when ekey is no longer a list container_start = container_start or 0 container_size = container_size or X_PLEX_CONTAINER_SIZE offset = container_start @@ -244,8 +373,8 @@ def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, if maxresults is not None: container_size = min(container_size, maxresults) - results = [] - subresults = [] + results: List[PlexObject] = [] + subresults: List[PlexObject] = [] headers = {} while True: @@ -253,6 +382,8 @@ def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, headers['X-Plex-Container-Size'] = str(container_size) data = self._server.query(ekey, headers=headers) + if not data: + return [] subresults = self.findItems(data, cls, ekey, **kwargs) total_size = utils.cast(int, data.attrib.get('totalSize') or data.attrib.get('size')) or len(subresults) @@ -282,7 +413,30 @@ def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, return results - def fetchItem(self, ekey, cls=None, **kwargs): + @overload + def fetchItem( + self, + ekey: Union[str, int], + cls: None, + **kwargs: Any, + ) -> PlexObject: + ... + + @overload + def fetchItem( + self, + ekey: Union[str, int], + cls: Type[PlexObjectT], + **kwargs: Any, + ) -> PlexObjectT: + ... + + def fetchItem( + self, + ekey: Union[str, int], + cls: Optional[Type[PlexObjectT]] = None, + **kwargs: Any, + ) -> Union[PlexObjectT, PlexObject]: """ Load the specified key to find and build the first item with the specified tag and attrs. If no tag or attrs are specified then the first item in the result set is returned. @@ -309,7 +463,36 @@ def fetchItem(self, ekey, cls=None, **kwargs): clsname = cls.__name__ if cls else 'None' raise NotFound(f'Unable to find elem: cls={clsname}, attrs={kwargs}') from None - def findItems(self, data, cls=None, initpath=None, rtag=None, **kwargs): + @overload + def findItems( + self, + data: Element, + cls: None, + initpath: Optional[str] = None, + rtag: Optional[str] = None, + **kwargs: Any, + ) -> List[PlexObject]: + ... + + @overload + def findItems( + self, + data: Element, + cls: Type[PlexObjectT], + initpath: Optional[str] = None, + rtag: Optional[str] = None, + **kwargs: Any, + ) -> List[PlexObjectT]: + ... + + def findItems( # type: ignore # TODO: Fix this type ignore, seems to be bug + self, + data: Element, + cls: Optional[Type[PlexObjectT]] = None, + initpath: Optional[str] = None, + rtag: Optional[str] = None, + **kwargs: Any, + ): """ Load the specified data to find and build all items with the specified tag and attrs. See :func:`~plexapi.base.PlexObject.fetchItem` for more details on how this is used. @@ -321,9 +504,9 @@ def findItems(self, data, cls=None, initpath=None, rtag=None, **kwargs): kwargs['type'] = cls.TYPE # rtag to iter on a specific root tag using breadth-first search if rtag: - data = next(utils.iterXMLBFS(data, rtag), []) + data = next(utils.iterXMLBFS(data, rtag), Element('empty')) # loop through all data elements to find matches - items = [] + items: List[Union[PlexObject, PlexObjectT]] = [] for elem in data: if self._checkAttrs(elem, **kwargs): item = self._buildItemOrNone(elem, cls, initpath) @@ -331,7 +514,14 @@ def findItems(self, data, cls=None, initpath=None, rtag=None, **kwargs): items.append(item) return items - def findItem(self, data, cls=None, initpath=None, rtag=None, **kwargs): + def findItem( + self, + data: Element, + cls: Optional[Type[PlexObjectT]] = None, + initpath: Optional[str] = None, + rtag: Optional[str] = None, + **kwargs: Any, + ): """ Load the specified data to find and build the first items with the specified tag and attrs. See :func:`~plexapi.base.PlexObject.fetchItem` for more details on how this is used. @@ -341,26 +531,26 @@ def findItem(self, data, cls=None, initpath=None, rtag=None, **kwargs): except IndexError: return None - def firstAttr(self, *attrs): + def firstAttr(self, *attrs: str): """ Return the first attribute in attrs that is not None. """ for attr in attrs: value = getattr(self, attr, None) if value is not None: return value - def listAttrs(self, data, attr, rtag=None, **kwargs): + def listAttrs(self, data: Element, attr: str, rtag: Optional[str] = None, **kwargs: Any): """ Return a list of values from matching attribute. """ results = [] # rtag to iter on a specific root tag using breadth-first search if rtag: - data = next(utils.iterXMLBFS(data, rtag), []) + data = next(utils.iterXMLBFS(data, rtag), Element('empty')) for elem in data: kwargs[f'{attr}__exists'] = True if self._checkAttrs(elem, **kwargs): results.append(elem.attrib.get(attr)) return results - def reload(self, key=None, **kwargs): + def reload(self, key: Optional[str] = None, **kwargs: Any): """ Reload the data for this object from self.key. Parameters: @@ -393,7 +583,7 @@ def reload(self, key=None, **kwargs): """ return self._reload(key=key, **kwargs) - def _reload(self, key=None, _overwriteNone=True, **kwargs): + def _reload(self, key: Optional[str] = None, _overwriteNone: bool = True, **kwargs: Any): """ Perform the actual reload. """ details_key = self._buildDetailsKey(**kwargs) if kwargs else self._details_key key = key or details_key or self.key @@ -401,12 +591,14 @@ def _reload(self, key=None, _overwriteNone=True, **kwargs): raise Unsupported('Cannot reload an object not built from a URL.') self._initpath = key data = self._server.query(key) + if data is None: + raise NotFound(f'Unable to find elem: {key=}') self._overwriteNone = _overwriteNone self._loadData(data[0]) self._overwriteNone = True return self - def _checkAttrs(self, elem, **kwargs): + def _checkAttrs(self, elem: Element, **kwargs: Any): attrsFound = {} for attr, query in kwargs.items(): attr, op, operator = self._getAttrOperator(attr) @@ -424,7 +616,7 @@ def _checkAttrs(self, elem, **kwargs): # log.debug('Checking %s for %s found: %s', elem.tag, kwargs, attrsFound) return all(attrsFound.values()) - def _getAttrOperator(self, attr): + def _getAttrOperator(self, attr: str): for op, operator in OPERATORS.items(): if attr.endswith(f'__{op}'): attr = attr.rsplit('__', 1)[0] @@ -432,11 +624,11 @@ def _getAttrOperator(self, attr): # default to exact match return attr, 'exact', OPERATORS['exact'] - def _getAttrValue(self, elem, attrstr, results=None): + def _getAttrValue(self, elem: Element, attrstr: str, results: Optional[List[str]] = None) -> List[str]: # log.debug('Fetching %s in %s', attrstr, elem.tag) parts = attrstr.split('__', 1) attr = parts[0] - attrstr = parts[1] if len(parts) == 2 else None + attrstr = parts[1] if len(parts) == 2 else "" if attrstr: results = [] if results is None else results for child in [c for c in elem if c.tag.lower() == attr.lower()]: @@ -451,7 +643,7 @@ def _getAttrValue(self, elem, attrstr, results=None): return [value] return [] - def _castAttrValue(self, op, query, value): + def _castAttrValue(self, op: str, query: Any, value: str) -> Any: if op == 'exists': return value if isinstance(query, bool): @@ -464,7 +656,7 @@ def _castAttrValue(self, op, query, value): return float(value) return value - def _loadData(self, data): + def _loadData(self, data: Element): raise NotImplementedError('Abstract method not implemented.') @property @@ -500,7 +692,7 @@ class PlexPartialObject(PlexObject): 'includeStations': 1 } - def __eq__(self, other): + def __eq__(self, other: Any) -> bool: if isinstance(other, PlexPartialObject): return self.key == other.key return NotImplemented @@ -511,17 +703,20 @@ def __hash__(self): def __iter__(self): yield self - def __getattribute__(self, attr): + def __getattribute__(self, attr: str): # Dragons inside.. :-/ - value = super(PlexPartialObject, self).__getattribute__(attr) + value = super().__getattribute__(attr) # Check a few cases where we don't want to reload - if attr in _DONT_RELOAD_FOR_KEYS: return value - if attr in USER_DONT_RELOAD_FOR_KEYS: return value - if attr.startswith('_'): return value - if value not in (None, []): return value - if self.isFullObject(): return value - if isinstance(self, (PlexSession, PlexHistory)): return value - if self._autoReload is False: return value + if ( + attr in _DONT_RELOAD_FOR_KEYS | USER_DONT_RELOAD_FOR_KEYS + or attr.startswith("_") + or value not in (None, []) + or self.isFullObject() + or isinstance(self, (PlexSession, PlexHistory)) + or self._autoReload is False + ): + return value + # Log the reload. clsname = self.__class__.__name__ title = self.__dict__.get('title', self.__dict__.get('name')) @@ -550,7 +745,7 @@ def analyze(self): * Generate intro video markers: Detects show intros, exposing the 'Skip Intro' button in clients. """ - key = f"/{self.key.lstrip('/')}/analyze" + key = f"/{self.key.lstrip('/')}/analyze" if self.key else None self._server.query(key, method=self._server._session.put) def isFullObject(self): @@ -585,7 +780,7 @@ def _edit(self, **kwargs): self.section()._edit(items=self, **kwargs) return self - def edit(self, **kwargs): + def edit(self, **kwargs: Any): """ Edit an object. Note: This is a low level method and you need to know all the field/tag keys. See :class:`~plexapi.mixins.EditFieldMixin` and :class:`~plexapi.mixins.EditTagsMixin` @@ -613,7 +808,7 @@ def edit(self, **kwargs): """ return self._edit(**kwargs) - def batchEdits(self): + def batchEdits(self) -> PlexPartialObject: """ Enable batch editing mode to save API calls. Must call :func:`~plexapi.base.PlexPartialObject.saveEdits` at the end to save all the edits. See :class:`~plexapi.mixins.EditFieldMixin` and :class:`~plexapi.mixins.EditTagsMixin` @@ -664,8 +859,10 @@ def refresh(self): key = f'{self.key}/refresh' self._server.query(key, method=self._server._session.put) - def section(self): + def section(self) -> LibrarySection: """ Returns the :class:`~plexapi.library.LibrarySection` this item belongs to. """ + # TODO: consider moving this to MediaContainer as `librarySectionByID` is not available + # or bring in the attr to this class return self._server.library.sectionByID(self.librarySectionID) def delete(self): @@ -677,7 +874,7 @@ def delete(self): 'have not allowed items to be deleted', self.key) raise - def history(self, maxresults=None, mindate=None): + def history(self, maxresults: Optional[int] = None, mindate: Optional[datetime] = None): """ Get Play History for a media item. Parameters: @@ -709,7 +906,7 @@ def playQueue(self, *args, **kwargs): return PlayQueue.create(self._server, self, *args, **kwargs) -class Playable: +class Playable(PlexPartialObject): """ This is a general place to store functions specific to media that is Playable. Things were getting mixed up a bit when dealing with Shows, Season, Artists, Albums which are all not playable. @@ -719,7 +916,7 @@ class Playable: playQueueItemID (int): PlayQueue item ID (only populated for :class:`~plexapi.playlist.PlayQueue` items). """ - def _loadData(self, data): + def _loadData(self, data: Element): self.playlistItemID = utils.cast(int, data.attrib.get('playlistItemID')) # playlist self.playQueueItemID = utils.cast(int, data.attrib.get('playQueueItemID')) # playqueue @@ -885,7 +1082,7 @@ def updateTimeline(self, time, state='stopped', duration=None): return self -class PlexSession(object): +class PlexSession(PlexObject): """ This is a general place to store functions specific to media that is a Plex Session. Attributes: @@ -898,16 +1095,16 @@ class PlexSession(object): if item is being transcoded (None otherwise). """ - def _loadData(self, data): + def _loadData(self, data: Element): self.live = utils.cast(bool, data.attrib.get('live', '0')) self.player = self.findItem(data, etag='Player') - self.session = self.findItem(data, etag='Session') + self.session = cast(Session, self.findItem(data, etag='Session')) self.sessionKey = utils.cast(int, data.attrib.get('sessionKey')) self.transcodeSession = self.findItem(data, etag='TranscodeSession') user = data.find('User') - self._username = user.attrib.get('title') - self._userId = utils.cast(int, user.attrib.get('id')) + self._username = user.attrib.get('title') if user is not None else None + self._userId = utils.cast(int, user.attrib.get('id')) if user is not None else None # For backwards compatibility self.players = [self.player] if self.player else [] @@ -940,6 +1137,8 @@ def _reload(self, _autoReload=False, **kwargs): key = self._initpath data = self._server.query(key) + if not data: + raise NotFound(f'Unable to find elem: {key=}') for elem in data: if elem.attrib.get('sessionKey') == str(self.sessionKey): self._loadData(elem) @@ -948,7 +1147,7 @@ def _reload(self, _autoReload=False, **kwargs): def source(self): """ Return the source media object for the session. """ - return self.fetchItem(self._details_key) + return self._details_key and self.fetchItem(self._details_key) def stop(self, reason=''): """ Stop playback for the session. @@ -964,7 +1163,7 @@ def stop(self, reason=''): return self._server.query(key, params=params) -class PlexHistory(object): +class PlexHistory(PlexObject): """ This is a general place to store functions specific to media that is a Plex history item. Attributes: @@ -974,7 +1173,7 @@ class PlexHistory(object): viewedAt (datetime): Datetime item was last watched. """ - def _loadData(self, data): + def _loadData(self, data: Element): self.accountID = utils.cast(int, data.attrib.get('accountID')) self.deviceID = utils.cast(int, data.attrib.get('deviceID')) self.historyKey = data.attrib.get('historyKey') @@ -1013,7 +1212,7 @@ class MediaContainer(PlexObject): """ TAG = 'MediaContainer' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.allowSync = utils.cast(int, data.attrib.get('allowSync')) self.augmentationKey = data.attrib.get('augmentationKey') diff --git a/plexapi/client.py b/plexapi/client.py index 279b49742..e4a1f234c 100644 --- a/plexapi/client.py +++ b/plexapi/client.py @@ -1,14 +1,21 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import time +from typing import TYPE_CHECKING from xml.etree import ElementTree import requests +from requests.status_codes import _codes as codes from plexapi import BASE_HEADERS, CONFIG, TIMEOUT, log, logfilter, utils from plexapi.base import PlexObject from plexapi.exceptions import BadRequest, NotFound, Unauthorized, Unsupported from plexapi.playqueue import PlayQueue -from requests.status_codes import _codes as codes + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + DEFAULT_MTYPE = 'video' @@ -119,7 +126,7 @@ def reload(self): """ Alias to self.connect(). """ return self.connect() - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.deviceClass = data.attrib.get('deviceClass') @@ -602,7 +609,7 @@ class ClientTimeline(PlexObject): key = 'timeline/poll' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.address = data.attrib.get('address') self.audioStreamId = utils.cast(int, data.attrib.get('audioStreamId')) diff --git a/plexapi/collection.py b/plexapi/collection.py index 809455eae..a056fdb62 100644 --- a/plexapi/collection.py +++ b/plexapi/collection.py @@ -1,5 +1,8 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import quote_plus from plexapi import media, utils @@ -13,6 +16,9 @@ ) from plexapi.utils import deprecated +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + @utils.registerPlexObject class Collection( @@ -64,7 +70,7 @@ class Collection( TAG = 'Directory' TYPE = 'collection' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.addedAt = utils.toDatetime(data.attrib.get('addedAt')) self.art = data.attrib.get('art') diff --git a/plexapi/library.py b/plexapi/library.py index b469ad61e..1a83bcb8d 100644 --- a/plexapi/library.py +++ b/plexapi/library.py @@ -1,7 +1,10 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import re from datetime import datetime from functools import cached_property +from typing import TYPE_CHECKING from urllib.parse import parse_qs, quote_plus, urlencode, urlparse from plexapi import log, media, utils @@ -14,6 +17,9 @@ from plexapi.settings import Setting from plexapi.utils import deprecated +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + class Library(PlexObject): """ Represents a PlexServer library. This contains all sections of media defined @@ -29,7 +35,7 @@ class Library(PlexObject): """ key = '/library' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.identifier = data.attrib.get('identifier') self.mediaTagVersion = data.attrib.get('mediaTagVersion') @@ -401,7 +407,7 @@ class LibrarySection(PlexObject): uuid (str): Unique id for the section (32258d7c-3e6c-4ac5-98ad-bad7a3b78c63) """ - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.agent = data.attrib.get('agent') self.allowSync = utils.cast(bool, data.attrib.get('allowSync')) @@ -2108,7 +2114,7 @@ class LibraryTimeline(PlexObject): """ TAG = 'LibraryTimeline' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.size = utils.cast(int, data.attrib.get('size')) @@ -2137,7 +2143,7 @@ class Location(PlexObject): """ TAG = 'Location' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.id = utils.cast(int, data.attrib.get('id')) @@ -2163,7 +2169,7 @@ class Hub(PlexObject): """ TAG = 'Hub' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.context = data.attrib.get('context') @@ -2222,7 +2228,7 @@ class LibraryMediaTag(PlexObject): """ TAG = 'Directory' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.count = utils.cast(int, data.attrib.get('count')) @@ -2612,7 +2618,7 @@ def __repr__(self): _type = self._clean(self.firstAttr('type')) return f"<{':'.join([p for p in [self.__class__.__name__, _type] if p])}>" - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.active = utils.cast(bool, data.attrib.get('active', '0')) self.fields = self.findItems(data, FilteringField) @@ -2806,7 +2812,7 @@ class FilteringFilter(PlexObject): """ TAG = 'Filter' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.filter = data.attrib.get('filter') self.filterType = data.attrib.get('filterType') @@ -2831,7 +2837,7 @@ class FilteringSort(PlexObject): """ TAG = 'Sort' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.active = utils.cast(bool, data.attrib.get('active', '0')) @@ -2856,7 +2862,7 @@ class FilteringField(PlexObject): """ TAG = 'Field' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.key = data.attrib.get('key') @@ -2879,7 +2885,7 @@ def __repr__(self): _type = self._clean(self.firstAttr('type')) return f"<{':'.join([p for p in [self.__class__.__name__, _type] if p])}>" - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.type = data.attrib.get('type') @@ -2896,7 +2902,7 @@ class FilteringOperator(PlexObject): """ TAG = 'Operator' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self.key = data.attrib.get('key') self.title = data.attrib.get('title') @@ -2918,7 +2924,7 @@ class FilterChoice(PlexObject): """ TAG = 'Directory' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.fastKey = data.attrib.get('fastKey') @@ -2944,7 +2950,7 @@ class ManagedHub(PlexObject): """ TAG = 'Hub' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.deletable = utils.cast(bool, data.attrib.get('deletable', True)) @@ -3068,7 +3074,7 @@ class Folder(PlexObject): title (str): Title of folder. """ - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self.key = data.attrib.get('key') self.title = data.attrib.get('title') @@ -3108,7 +3114,7 @@ class FirstCharacter(PlexObject): title (str): Character (#, !, A, B, C, ...). """ - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.key = data.attrib.get('key') @@ -3130,7 +3136,7 @@ class Path(PlexObject): """ TAG = 'Path' - def _loadData(self, data): + def _loadData(self, data: Element): self.home = utils.cast(bool, data.attrib.get('home')) self.key = data.attrib.get('key') self.network = utils.cast(bool, data.attrib.get('network')) @@ -3159,7 +3165,7 @@ class File(PlexObject): """ TAG = 'File' - def _loadData(self, data): + def _loadData(self, data: Element): self.key = data.attrib.get('key') self.path = data.attrib.get('path') self.title = data.attrib.get('title') @@ -3207,7 +3213,7 @@ class Common(PlexObject): """ TAG = 'Common' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.collections = self.findItems(data, media.Collection) self.contentRating = data.attrib.get('contentRating') diff --git a/plexapi/media.py b/plexapi/media.py index 4ea5c28cb..c841bb96f 100644 --- a/plexapi/media.py +++ b/plexapi/media.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import xml from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import quote_plus from plexapi import log, settings, utils @@ -8,6 +11,9 @@ from plexapi.exceptions import BadRequest from plexapi.utils import deprecated +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + @utils.registerPlexObject class Media(PlexObject): @@ -48,7 +54,7 @@ class Media(PlexObject): """ TAG = 'Media' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.aspectRatio = utils.cast(float, data.attrib.get('aspectRatio')) @@ -130,7 +136,7 @@ class MediaPart(PlexObject): """ TAG = 'Part' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.accessible = utils.cast(bool, data.attrib.get('accessible')) @@ -263,7 +269,7 @@ class MediaPartStream(PlexObject): type (int): Alias for streamType. """ - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.bitrate = utils.cast(int, data.attrib.get('bitrate')) @@ -330,7 +336,7 @@ class VideoStream(MediaPartStream): TAG = 'Stream' STREAMTYPE = 1 - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ super(VideoStream, self)._loadData(data) self.anamorphic = data.attrib.get('anamorphic') @@ -399,7 +405,7 @@ class AudioStream(MediaPartStream): TAG = 'Stream' STREAMTYPE = 2 - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ super(AudioStream, self)._loadData(data) self.audioChannelLayout = data.attrib.get('audioChannelLayout') @@ -455,7 +461,7 @@ class SubtitleStream(MediaPartStream): TAG = 'Stream' STREAMTYPE = 3 - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ super(SubtitleStream, self)._loadData(data) self.container = data.attrib.get('container') @@ -495,7 +501,7 @@ class LyricStream(MediaPartStream): TAG = 'Stream' STREAMTYPE = 4 - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ super(LyricStream, self)._loadData(data) self.format = data.attrib.get('format') @@ -516,7 +522,7 @@ class Session(PlexObject): """ TAG = 'Session' - def _loadData(self, data): + def _loadData(self, data: Element): self.id = data.attrib.get('id') self.bandwidth = utils.cast(int, data.attrib.get('bandwidth')) self.location = data.attrib.get('location') @@ -561,7 +567,7 @@ class TranscodeSession(PlexObject): """ TAG = 'TranscodeSession' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.audioChannels = utils.cast(int, data.attrib.get('audioChannels')) @@ -603,7 +609,7 @@ class TranscodeJob(PlexObject): Active or paused optimization items. Usually one item as a time.""" TAG = 'TranscodeJob' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.generatorID = data.attrib.get('generatorID') self.key = data.attrib.get('key') @@ -622,7 +628,7 @@ class Optimized(PlexObject): Optimized items are optimized and queued conversions items.""" TAG = 'Item' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.id = data.attrib.get('id') self.composite = data.attrib.get('composite') @@ -660,7 +666,7 @@ class Conversion(PlexObject): Conversions are items queued for optimization or being actively optimized.""" TAG = 'Video' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.addedAt = data.attrib.get('addedAt') self.art = data.attrib.get('art') @@ -735,7 +741,7 @@ def __str__(self): """ Returns the tag name. """ return self.tag - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.filter = data.attrib.get('filter') @@ -946,7 +952,7 @@ class Guid(PlexObject): """ TAG = 'Guid' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.id = data.attrib.get('id') @@ -966,7 +972,7 @@ class Rating(PlexObject): """ TAG = 'Rating' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.image = data.attrib.get('image') @@ -990,7 +996,7 @@ class Review(PlexObject): """ TAG = 'Review' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.filter = data.attrib.get('filter') self.id = utils.cast(int, data.attrib.get('id', 0)) @@ -1014,7 +1020,7 @@ class BaseResource(PlexObject): thumb (str): The URL to retrieve the resource thumbnail. """ - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.key = data.attrib.get('key') self.provider = data.attrib.get('provider') @@ -1084,7 +1090,7 @@ def __repr__(self): offsets = f'{start}-{end}' return f"<{':'.join([self.__class__.__name__, name, offsets])}>" - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.end = utils.cast(int, data.attrib.get('endTimeOffset')) self.filter = data.attrib.get('filter') @@ -1118,7 +1124,7 @@ def __repr__(self): offsets = f'{start}-{end}' return f"<{':'.join([self.__class__.__name__, name, offsets])}>" - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.end = utils.cast(int, data.attrib.get('endTimeOffset')) self.final = utils.cast(bool, data.attrib.get('final')) @@ -1152,7 +1158,7 @@ class Field(PlexObject): """ TAG = 'Field' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.locked = utils.cast(bool, data.attrib.get('locked')) self.name = data.attrib.get('name') @@ -1172,7 +1178,7 @@ def __repr__(self): score = self._clean(self.firstAttr('score')) return f"<{':'.join([p for p in [self.__class__.__name__, name, score] if p])}>" - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.guid = data.attrib.get('guid') self.lifespanEnded = data.attrib.get('lifespanEnded') @@ -1194,7 +1200,7 @@ def __repr__(self): uid = self._clean(self.firstAttr('shortIdentifier')) return f"<{':'.join([p for p in [self.__class__.__name__, uid] if p])}>" - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.hasAttribution = data.attrib.get('hasAttribution') self.hasPrefs = data.attrib.get('hasPrefs') @@ -1237,7 +1243,7 @@ def __repr__(self): uid = self._clean(self.firstAttr('name')) return f"<{':'.join([p for p in [self.__class__.__name__, uid] if p])}>" - def _loadData(self, data): + def _loadData(self, data: Element): self.languageCodes = self.listAttrs(data, 'code', etag='Language') self.mediaType = utils.cast(int, data.attrib.get('mediaType')) self.name = data.attrib.get('name') @@ -1271,7 +1277,7 @@ class Availability(PlexObject): def __repr__(self): return f'<{self.__class__.__name__}:{self.platform}:{self.offerType}>' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.country = data.attrib.get('country') self.offerType = data.attrib.get('offerType') diff --git a/plexapi/myplex.py b/plexapi/myplex.py index 0c95aaba0..9eda6b262 100644 --- a/plexapi/myplex.py +++ b/plexapi/myplex.py @@ -1,15 +1,19 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import copy import html import threading import time +from typing import TYPE_CHECKING from urllib.parse import parse_qsl, urlencode, urlsplit, urlunsplit from xml.etree import ElementTree import requests +from requests.status_codes import _codes as codes -from plexapi import (BASE_HEADERS, CONFIG, TIMEOUT, X_PLEX_ENABLE_FAST_CONNECT, X_PLEX_IDENTIFIER, - log, logfilter, utils) +from plexapi import (BASE_HEADERS, CONFIG, TIMEOUT, X_PLEX_ENABLE_FAST_CONNECT, + X_PLEX_IDENTIFIER, log, logfilter, utils) from plexapi.base import PlexObject from plexapi.client import PlexClient from plexapi.exceptions import BadRequest, NotFound, Unauthorized @@ -17,7 +21,9 @@ from plexapi.server import PlexServer from plexapi.sonos import PlexSonosClient from plexapi.sync import SyncItem, SyncList -from requests.status_codes import _codes as codes + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element class MyPlexAccount(PlexObject): @@ -142,7 +148,7 @@ def signout(self): """ Sign out of the Plex account. Invalidates the authentication token. """ return self.query(self.SIGNOUT, method=self._session.delete) - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self._token = logfilter.add_secret(data.attrib.get('authToken')) @@ -1198,7 +1204,7 @@ class MyPlexUser(PlexObject): TAG = 'User' key = 'https://plex.tv/api/users/' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.friend = self._initpath == self.key @@ -1275,7 +1281,7 @@ class MyPlexInvite(PlexObject): REQUESTS = 'https://plex.tv/api/invites/requests' REQUESTED = 'https://plex.tv/api/invites/requested' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.createdAt = utils.toDatetime(data.attrib.get('createdAt')) @@ -1307,7 +1313,7 @@ class Section(PlexObject): """ TAG = 'Section' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.id = utils.cast(int, data.attrib.get('id')) self.key = utils.cast(int, data.attrib.get('key')) @@ -1345,7 +1351,7 @@ class MyPlexServerShare(PlexObject): """ TAG = 'Server' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.id = utils.cast(int, data.attrib.get('id')) @@ -1430,7 +1436,7 @@ class MyPlexResource(PlexObject): DEFAULT_LOCATION_ORDER = ['local', 'remote', 'relay'] DEFAULT_SCHEME_ORDER = ['https', 'http'] - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.accessToken = logfilter.add_secret(data.attrib.get('accessToken')) self.clientIdentifier = data.attrib.get('clientIdentifier') @@ -1548,7 +1554,7 @@ class ResourceConnection(PlexObject): """ TAG = 'connection' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.address = data.attrib.get('address') self.ipv6 = utils.cast(bool, data.attrib.get('IPv6')) @@ -1591,7 +1597,7 @@ class MyPlexDevice(PlexObject): TAG = 'Device' key = 'https://plex.tv/devices.xml' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.name = data.attrib.get('name') self.publicAddress = data.attrib.get('publicAddress') @@ -1928,7 +1934,7 @@ class AccountOptOut(PlexObject): TAG = 'optOut' CHOICES = {'opt_in', 'opt_out', 'opt_out_managed'} - def _loadData(self, data): + def _loadData(self, data: Element): self.key = data.attrib.get('key') self.value = data.attrib.get('value') @@ -1986,7 +1992,7 @@ class UserState(PlexObject): def __repr__(self): return f'<{self.__class__.__name__}:{self.ratingKey}>' - def _loadData(self, data): + def _loadData(self, data: Element): self.lastViewedAt = utils.toDatetime(data.attrib.get('lastViewedAt')) self.ratingKey = data.attrib.get('ratingKey') self.type = data.attrib.get('type') @@ -2015,7 +2021,7 @@ class GeoLocation(PlexObject): """ TAG = 'location' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.city = data.attrib.get('city') self.code = data.attrib.get('code') diff --git a/plexapi/photo.py b/plexapi/photo.py index 8737d814c..2138d3588 100644 --- a/plexapi/photo.py +++ b/plexapi/photo.py @@ -1,6 +1,9 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import os from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import quote_plus from plexapi import media, utils, video @@ -12,6 +15,9 @@ PhotoalbumEditMixins, PhotoEditMixins ) +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + @utils.registerPlexObject class Photoalbum( @@ -50,7 +56,7 @@ class Photoalbum( TYPE = 'photo' _searchType = 'photoalbum' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self.addedAt = utils.toDatetime(data.attrib.get('addedAt')) self.art = data.attrib.get('art') @@ -149,7 +155,7 @@ def metadataDirectory(self): @utils.registerPlexObject class Photo( - PlexPartialObject, Playable, + Playable, RatingMixin, ArtUrlMixin, PosterUrlMixin, PhotoEditMixins @@ -194,7 +200,7 @@ class Photo( TYPE = 'photo' METADATA_TYPE = 'photo' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Playable._loadData(self, data) self.addedAt = utils.toDatetime(data.attrib.get('addedAt')) @@ -311,7 +317,7 @@ class PhotoSession(PlexSession, Photo): """ _SESSIONTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Photo._loadData(self, data) PlexSession._loadData(self, data) diff --git a/plexapi/playlist.py b/plexapi/playlist.py index 14ef88edb..0c3751176 100644 --- a/plexapi/playlist.py +++ b/plexapi/playlist.py @@ -1,19 +1,25 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import re from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import quote_plus, unquote from plexapi import media, utils -from plexapi.base import Playable, PlexPartialObject +from plexapi.base import Playable from plexapi.exceptions import BadRequest, NotFound, Unsupported from plexapi.library import LibrarySection, MusicSection from plexapi.mixins import SmartFilterMixin, ArtMixin, PosterMixin, PlaylistEditMixins from plexapi.utils import deprecated +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + @utils.registerPlexObject class Playlist( - PlexPartialObject, Playable, + Playable, SmartFilterMixin, ArtMixin, PosterMixin, PlaylistEditMixins @@ -50,7 +56,7 @@ class Playlist( TAG = 'Playlist' TYPE = 'playlist' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Playable._loadData(self, data) self.addedAt = utils.toDatetime(data.attrib.get('addedAt')) diff --git a/plexapi/playqueue.py b/plexapi/playqueue.py index 9835c0dd2..358edd414 100644 --- a/plexapi/playqueue.py +++ b/plexapi/playqueue.py @@ -1,10 +1,16 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + +from typing import TYPE_CHECKING from urllib.parse import quote_plus from plexapi import utils from plexapi.base import PlexObject from plexapi.exceptions import BadRequest +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + class PlayQueue(PlexObject): """Control a PlayQueue. @@ -35,7 +41,7 @@ class PlayQueue(PlexObject): TAG = "PlayQueue" TYPE = "playqueue" - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.identifier = data.attrib.get("identifier") self.mediaTagPrefix = data.attrib.get("mediaTagPrefix") diff --git a/plexapi/server.py b/plexapi/server.py index bdd330f78..610c7cf96 100644 --- a/plexapi/server.py +++ b/plexapi/server.py @@ -1,25 +1,30 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import os +from datetime import datetime from functools import cached_property +from typing import TYPE_CHECKING, Optional, Union from urllib.parse import urlencode from xml.etree import ElementTree import requests +from requests.status_codes import _codes as codes -from plexapi import BASE_HEADERS, CONFIG, TIMEOUT, log, logfilter +from plexapi import BASE_HEADERS, CONFIG, TIMEOUT +from plexapi import log, logfilter from plexapi import utils from plexapi.alert import AlertListener from plexapi.base import PlexObject from plexapi.client import PlexClient from plexapi.collection import Collection from plexapi.exceptions import BadRequest, NotFound, Unauthorized -from plexapi.library import Hub, Library, Path, File +from plexapi.library import File, Hub, Library, Path from plexapi.media import Conversion, Optimized from plexapi.playlist import Playlist from plexapi.playqueue import PlayQueue from plexapi.settings import Settings from plexapi.utils import deprecated -from requests.status_codes import _codes as codes # Need these imports to populate utils.PLEXOBJECTS from plexapi import audio as _audio # noqa: F401 @@ -29,6 +34,9 @@ from plexapi import playlist as _playlist # noqa: F401 from plexapi import video as _video # noqa: F401 +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + class PlexServer(PlexObject): """ This is the main entry point to interacting with a Plex server. It allows you to @@ -116,7 +124,7 @@ def __init__(self, baseurl=None, token=None, session=None, timeout=None): data = self.query(self.key, timeout=self._timeout) super(PlexServer, self).__init__(self, data, self.key) - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.allowCameraUpload = utils.cast(bool, data.attrib.get('allowCameraUpload')) @@ -650,7 +658,14 @@ def installUpdate(self): # figure out what method this is.. return self.query(part, method=self._session.put) - def history(self, maxresults=None, mindate=None, ratingKey=None, accountID=None, librarySectionID=None): + def history( + self, + maxresults: Optional[int] = None, + mindate: Optional[datetime] = None, + ratingKey: Optional[Union[int, str]] = None, + accountID: Optional[Union[int, str]] = None, + librarySectionID: Optional[Union[int, str]] = None, + ): """ Returns a list of media items from watched history. If there are many results, they will be fetched from the server in batches of X_PLEX_CONTAINER_SIZE amounts. If you're only looking for the first results, it would be wise to set the maxresults option to that @@ -1091,7 +1106,7 @@ class Account(PlexObject): """ key = '/myplex/account' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.authToken = data.attrib.get('authToken') self.username = data.attrib.get('username') @@ -1112,7 +1127,7 @@ class Activity(PlexObject): """A currently running activity on the PlexServer.""" key = '/activities' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.cancellable = utils.cast(bool, data.attrib.get('cancellable')) self.progress = utils.cast(int, data.attrib.get('progress')) @@ -1127,7 +1142,7 @@ class Release(PlexObject): TAG = 'Release' key = '/updater/status' - def _loadData(self, data): + def _loadData(self, data: Element): self.download_key = data.attrib.get('key') self.version = data.attrib.get('version') self.added = data.attrib.get('added') @@ -1152,7 +1167,7 @@ class SystemAccount(PlexObject): """ TAG = 'Account' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.autoSelectAudio = utils.cast(bool, data.attrib.get('autoSelectAudio')) self.defaultAudioLanguage = data.attrib.get('defaultAudioLanguage') @@ -1181,7 +1196,7 @@ class SystemDevice(PlexObject): """ TAG = 'Device' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.clientIdentifier = data.attrib.get('clientIdentifier') self.createdAt = utils.toDatetime(data.attrib.get('createdAt')) @@ -1207,7 +1222,7 @@ class StatisticsBandwidth(PlexObject): """ TAG = 'StatisticsBandwidth' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.accountID = utils.cast(int, data.attrib.get('accountID')) self.at = utils.toDatetime(data.attrib.get('at')) @@ -1249,7 +1264,7 @@ class StatisticsResources(PlexObject): """ TAG = 'StatisticsResources' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.at = utils.toDatetime(data.attrib.get('at')) self.hostCpuUtilization = utils.cast(float, data.attrib.get('hostCpuUtilization')) @@ -1277,7 +1292,7 @@ class ButlerTask(PlexObject): """ TAG = 'ButlerTask' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.description = data.attrib.get('description') self.enabled = utils.cast(bool, data.attrib.get('enabled')) @@ -1299,7 +1314,7 @@ class Identity(PlexObject): def __repr__(self): return f"<{self.__class__.__name__}:{self.machineIdentifier}>" - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.claimed = utils.cast(bool, data.attrib.get('claimed')) self.machineIdentifier = data.attrib.get('machineIdentifier') diff --git a/plexapi/settings.py b/plexapi/settings.py index c191e3689..a83637017 100644 --- a/plexapi/settings.py +++ b/plexapi/settings.py @@ -1,11 +1,17 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + from collections import defaultdict +from typing import TYPE_CHECKING from urllib.parse import quote from plexapi import log, utils from plexapi.base import PlexObject from plexapi.exceptions import BadRequest, NotFound +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + class Settings(PlexObject): """ Container class for all settings. Allows getting and setting PlexServer settings. @@ -111,7 +117,7 @@ class Setting(PlexObject): 'text': {'type': str, 'cast': str, 'tostr': str}, } - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self.type = data.attrib.get('type') self.advanced = utils.cast(bool, data.attrib.get('advanced')) diff --git a/plexapi/sync.py b/plexapi/sync.py index f57e89d96..b20494152 100644 --- a/plexapi/sync.py +++ b/plexapi/sync.py @@ -23,11 +23,18 @@ def init_sync(): You have to fake platform/device/model because transcoding profiles are hardcoded in Plex, and you obviously have to explicitly specify that your app supports `sync-target`. """ +from __future__ import annotations + +from typing import TYPE_CHECKING + import requests import plexapi from plexapi.base import PlexObject -from plexapi.exceptions import NotFound, BadRequest +from plexapi.exceptions import BadRequest, NotFound + +if TYPE_CHECKING: + from xml.etree.ElementTree import Element class SyncItem(PlexObject): @@ -62,7 +69,7 @@ def __init__(self, server, data, initpath=None, clientIdentifier=None): super(SyncItem, self).__init__(server, data, initpath) self.clientIdentifier = clientIdentifier - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.id = plexapi.utils.cast(int, data.attrib.get('id')) self.version = plexapi.utils.cast(int, data.attrib.get('version')) @@ -117,7 +124,7 @@ class SyncList(PlexObject): key = 'https://plex.tv/devices/{clientId}/sync_items' TAG = 'SyncList' - def _loadData(self, data): + def _loadData(self, data: Element): self._data = data self.clientId = data.attrib.get('clientIdentifier') self.items = [] diff --git a/plexapi/utils.py b/plexapi/utils.py index 8478f2d41..09f1204db 100644 --- a/plexapi/utils.py +++ b/plexapi/utils.py @@ -1,4 +1,6 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import base64 import functools import json @@ -15,6 +17,7 @@ from getpass import getpass from hashlib import sha1 from threading import Event, Thread +from typing import TYPE_CHECKING, Any, Optional, Type, Union, overload from urllib.parse import quote import requests @@ -22,6 +25,9 @@ from plexapi.exceptions import BadRequest, NotFound, Unauthorized +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + try: from tqdm import tqdm except ImportError: @@ -136,7 +142,32 @@ def registerPlexObject(cls): return cls -def cast(func, value): +@overload +def cast(func: Type[bool], value: Any) -> bool: + ... + + +@overload +def cast(func: Type[int], value: Any) -> int: + ... + + +@overload +def cast(func: Type[float], value: Any) -> float: + ... + + +@overload +def cast(func: Type[str], value: Any) -> str: + ... + +# multiple overloads needed as these are primitive types and also classes +# generic type hints do not work here for that reason + + +def cast( + func: Type[Union[int, float, bool, str]], value: Any +) -> Union[int, float, bool, str]: """ Cast the specified value to the specified type (returned by func). Currently this only support str, int, float, bool. Should be extended if needed. @@ -625,7 +656,7 @@ def wrapper(*args, **kwargs): return decorator -def iterXMLBFS(root, tag=None): +def iterXMLBFS(root: Element, tag: Optional[str] = None): """ Iterate through an XML tree using a breadth-first search. If tag is specified, only return nodes with that tag. """ diff --git a/plexapi/video.py b/plexapi/video.py index 4a66c0a4f..10635b76c 100644 --- a/plexapi/video.py +++ b/plexapi/video.py @@ -1,11 +1,14 @@ # -*- coding: utf-8 -*- +from __future__ import annotations + import os from functools import cached_property from pathlib import Path +from typing import TYPE_CHECKING from urllib.parse import quote_plus from plexapi import media, utils -from plexapi.base import Playable, PlexPartialObject, PlexHistory, PlexSession +from plexapi.base import Playable, PlexHistory, PlexPartialObject, PlexSession from plexapi.exceptions import BadRequest from plexapi.mixins import ( AdvancedSettingsMixin, SplitMergeMixin, UnmatchMatchMixin, ExtrasMixin, HubsMixin, PlayedUnplayedMixin, RatingMixin, @@ -14,6 +17,9 @@ WatchlistMixin ) +if TYPE_CHECKING: + from xml.etree.ElementTree import Element + class Video(PlexPartialObject, PlayedUnplayedMixin): """ Base class for all video objects including :class:`~plexapi.video.Movie`, @@ -45,7 +51,7 @@ class Video(PlexPartialObject, PlayedUnplayedMixin): viewCount (int): Count of times the item was played. """ - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ self._data = data self.addedAt = utils.toDatetime(data.attrib.get('addedAt')) @@ -227,7 +233,7 @@ def optimize(self, title='', target='', deviceProfile='', videoQuality=None, """ from plexapi.library import Location - from plexapi.sync import Policy, MediaSettings + from plexapi.sync import MediaSettings, Policy backgroundProcessing = self.fetchItem('/playlists?type=42') key = f'{backgroundProcessing.key}/items' @@ -309,7 +315,7 @@ def sync(self, videoQuality, client=None, clientId=None, limit=None, unwatched=F :class:`~plexapi.sync.SyncItem`: an instance of created syncItem. """ - from plexapi.sync import SyncItem, Policy, MediaSettings + from plexapi.sync import MediaSettings, Policy, SyncItem myplex = self._server.myPlexAccount() sync_item = SyncItem(self._server, None) @@ -382,7 +388,7 @@ class Movie( TYPE = 'movie' METADATA_TYPE = 'movie' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Video._loadData(self, data) Playable._loadData(self, data) @@ -546,7 +552,7 @@ class Show( TYPE = 'show' METADATA_TYPE = 'episode' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Video._loadData(self, data) self.audienceRating = utils.cast(float, data.attrib.get('audienceRating')) @@ -732,7 +738,7 @@ class Season( TYPE = 'season' METADATA_TYPE = 'episode' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Video._loadData(self, data) self.audioLanguage = data.attrib.get('audioLanguage', '') @@ -906,7 +912,7 @@ class Episode( TYPE = 'episode' METADATA_TYPE = 'episode' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Video._loadData(self, data) Playable._loadData(self, data) @@ -1107,7 +1113,7 @@ class Clip( TYPE = 'clip' METADATA_TYPE = 'clip' - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Video._loadData(self, data) Playable._loadData(self, data) @@ -1149,7 +1155,7 @@ def metadataDirectory(self): class Extra(Clip): """ Represents a single Extra (trailer, behindTheScenes, etc). """ - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ super(Extra, self)._loadData(data) parent = self._parent() @@ -1169,7 +1175,7 @@ class MovieSession(PlexSession, Movie): """ _SESSIONTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Movie._loadData(self, data) PlexSession._loadData(self, data) @@ -1182,7 +1188,7 @@ class EpisodeSession(PlexSession, Episode): """ _SESSIONTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Episode._loadData(self, data) PlexSession._loadData(self, data) @@ -1195,7 +1201,7 @@ class ClipSession(PlexSession, Clip): """ _SESSIONTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Clip._loadData(self, data) PlexSession._loadData(self, data) @@ -1208,7 +1214,7 @@ class MovieHistory(PlexHistory, Movie): """ _HISTORYTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Movie._loadData(self, data) PlexHistory._loadData(self, data) @@ -1221,7 +1227,7 @@ class EpisodeHistory(PlexHistory, Episode): """ _HISTORYTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Episode._loadData(self, data) PlexHistory._loadData(self, data) @@ -1234,7 +1240,7 @@ class ClipHistory(PlexHistory, Clip): """ _HISTORYTYPE = True - def _loadData(self, data): + def _loadData(self, data: Element): """ Load attribute values from Plex XML response. """ Clip._loadData(self, data) PlexHistory._loadData(self, data)