diff --git a/.github/workflows/python-package.yml b/.github/workflows/python-package.yml index 44e2d4d..834014e 100644 --- a/.github/workflows/python-package.yml +++ b/.github/workflows/python-package.yml @@ -22,6 +22,7 @@ jobs: - "3.9" - "3.10" - "3.11" + - "3.12" include: # ubuntu-latest[22.04] does not have: py36 - os: "ubuntu-20.04" diff --git a/CHANGELOG.txt b/CHANGELOG.txt index 513e0ca..6f6deef 100644 --- a/CHANGELOG.txt +++ b/CHANGELOG.txt @@ -2,6 +2,12 @@ 1.0 will be a complete api overhaul +0.12.1 + * typing + * added `METADATA_PARSER_FUTURE` environment variable + `export METADATA_PARSER_FUTURE=1` to enable + * is_parsed_valid_url can accept a ParseResultBytes object now + 0.12.0 * drop python 2.7 * initial typing support diff --git a/README.rst b/README.rst index 156ba8e..668a3bf 100644 --- a/README.rst +++ b/README.rst @@ -40,6 +40,10 @@ them proper attention is not guaranteed. The current MAJOR release is `0`. A `1` MAJOR release is planned, and will have an entirely different structure and API. +Future deprecations will raise warnings. + +By populating the following environment variable, future deprecations will raise exceptions: + export METADATA_PARSER_FUTURE=1 Installation ============= diff --git a/mypy.ini b/mypy.ini index d3b03e3..6a85372 100644 --- a/mypy.ini +++ b/mypy.ini @@ -1,2 +1,15 @@ +[mypy] +check_untyped_defs = True +exclude = (?x)( + ^setup\.py$ + | ^workspace-demos\/ + ) + +[mypy-httpbin.*] +ignore_missing_imports = True + +[mypy-pytest_httpbin.*] +ignore_missing_imports = True + [mypy-requests_toolbelt.*] ignore_missing_imports = True diff --git a/setup.cfg b/setup.cfg index b5ba680..f65ddae 100644 --- a/setup.cfg +++ b/setup.cfg @@ -8,6 +8,6 @@ per-file-ignores = src/metadata_parser/__init__.py: E501,I202 tests/*: E501 tests/_compat.py: F401 -exclude = .eggs/*, .pytest_cache/*, .tox/*, build/*, dist/* +exclude = .eggs/*, .pytest_cache/*, .tox/*, build/*, dist/*, workspace-demos/* application_import_names = metadata_parser import_order_style = appnexus diff --git a/setup.py b/setup.py index 7d86d97..0f0d02e 100644 --- a/setup.py +++ b/setup.py @@ -62,6 +62,7 @@ "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Programming Language :: Python :: 3.12", "Topic :: Text Processing :: Markup :: HTML", "Topic :: Software Development :: Libraries :: Python Modules", ], diff --git a/src/metadata_parser/__init__.py b/src/metadata_parser/__init__.py index f3d43b6..794cd84 100644 --- a/src/metadata_parser/__init__.py +++ b/src/metadata_parser/__init__.py @@ -1,5 +1,6 @@ -# stdlib import _socket # noqa: I100,I201 # peername hack, see below + +# stdlib import cgi # noqa: I100,I201 import collections import datetime @@ -10,12 +11,17 @@ import socket # peername hack, see below import typing from typing import Callable +from typing import Dict from typing import Iterable +from typing import List from typing import Optional +from typing import Tuple from typing import TYPE_CHECKING from typing import Union import unicodedata +from urllib.parse import _ResultMixinStr # what happens if you decode from urllib.parse import ParseResult +from urllib.parse import ParseResultBytes from urllib.parse import quote as url_quote from urllib.parse import unquote as url_unquote from urllib.parse import urlparse @@ -31,16 +37,16 @@ if TYPE_CHECKING: from bs4 import Tag as _bs4_Tag - if __debug__: # only used for testing. turn off in most production env with -o flags - import pdb # noqa: F401 import pprint # noqa: F401 +FUTURE_BEHAVIOR = bool(int(os.getenv("METADATA_PARSER_FUTURE", "0"))) + # ============================================================================== -__VERSION__ = "0.12.0" +__VERSION__ = "0.12.1" # ------------------------------------------------------------------------------ @@ -51,6 +57,8 @@ def warn_future(message: str) -> None: warnings.warn(message, FutureWarning, stacklevel=2) + if FUTURE_BEHAVIOR: + raise ValueError(message) def warn_user(message: str) -> None: @@ -76,6 +84,12 @@ def warn_user(message: str) -> None: ) # bytes; this is .5MB """ + +TYPES_RESPONSE = Union["DummyResponse", requests.Response] +TYPES_PEERNAME = Tuple[str, int] # (ip, port) +TYPE_URL_FETCH = Tuple[str, str, "ResponseHistory"] + + # ------------------------------------------------------------------------------ _DISABLE_TLDEXTRACT = bool( @@ -101,13 +115,12 @@ def warn_user(message: str) -> None: # only use for these stdlib packages # eventually will not be needed thanks to upstream changes in `requests` try: - _compatible_sockets: tuple = ( + _compatible_sockets: Tuple = ( _socket.socket, socket._socketobject, # type: ignore[attr-defined] ) except AttributeError: - _compatible_sockets: tuple = (_socket.socket,) # type: ignore[no-redef] - + _compatible_sockets: Tuple = (_socket.socket,) # type: ignore[no-redef] # ------------------------------------------------------------------------------ @@ -307,7 +320,7 @@ def get_encoding_from_headers(headers: CaseInsensitiveDict) -> Optional[str]: # ------------------------------------------------------------------------------ -def get_response_peername(resp: requests.Response) -> Optional[str]: +def get_response_peername(resp: TYPES_RESPONSE) -> Optional[TYPES_PEERNAME]: """ used to get the peername (ip+port) data from the request if a socket is found, caches this onto the request object @@ -315,16 +328,23 @@ def get_response_peername(resp: requests.Response) -> Optional[str]: IMPORTANT. this must happen BEFORE any content is consumed. `response` is really `requests.models.Response` + + This will UPGRADE the response object to have the following attribute: + + * _mp_peername """ - if not isinstance(resp, requests.Response): + if not isinstance(resp, requests.Response) and not isinstance(resp, DummyResponse): # raise AllowableError("Not a HTTPResponse") - log.debug("Not a HTTPResponse | %s", resp) + log.debug("Not a supported HTTPResponse | %s", resp) + log.debug("-> received a type of: %s", type(resp)) return None if hasattr(resp, "_mp_peername"): return resp._mp_peername - def _get_socket(): + def _get_socket() -> Optional[socket.socket]: + if isinstance(resp, DummyResponse): + return None i = 0 while True: i += 1 @@ -352,20 +372,21 @@ def _get_socket(): if sock: # only cache if we have a sock # we may want/need to call again - resp._mp_peername = sock.getpeername() # type: ignore[attr-defined] - return resp._mp_peername # type: ignore[attr-defined] - return None + resp._mp_peername = sock.getpeername() # type: ignore [union-attr] + else: + resp._mp_peername = None # type: ignore [union-attr] + return resp._mp_peername # type: ignore [union-attr] # ------------------------------------------------------------------------------ -def response_peername__hook(resp: requests.Response, *args, **kwargs) -> None: +def response_peername__hook(resp: TYPES_RESPONSE, *args, **kwargs) -> None: get_response_peername(resp) # do not return anything -def safe_sample(source: typing.AnyStr) -> bytes: +def safe_sample(source: Union[str, bytes]) -> bytes: if isinstance(source, bytes): _sample = source[:1024] else: @@ -376,7 +397,7 @@ def safe_sample(source: typing.AnyStr) -> bytes: return _sample -def derive_encoding__hook(resp: requests.Response, *args, **kwargs) -> None: +def derive_encoding__hook(resp: TYPES_RESPONSE, *args, **kwargs) -> None: """ a note about `requests` @@ -389,19 +410,24 @@ def derive_encoding__hook(resp: requests.Response, *args, **kwargs) -> None: body. This hook exists because users in certain regions may expect the servers to not follow RFC and for the default encoding to be different. """ - resp._encoding_fallback = ENCODING_FALLBACK # type: ignore[attr-defined] + if TYPE_CHECKING: + assert hasattr(resp, "_encoding_fallback") + assert hasattr(resp, "_encoding_content") + assert hasattr(resp, "_encoding_headers") + + resp._encoding_fallback = ENCODING_FALLBACK # modified version, returns `None` if no charset available - resp._encoding_headers = get_encoding_from_headers(resp.headers) # type: ignore[attr-defined] - resp._encoding_content = None # type: ignore[attr-defined] - if not resp._encoding_headers and resp.content: # type: ignore[attr-defined] + resp._encoding_headers = get_encoding_from_headers(resp.headers) + resp._encoding_content = None + if not resp._encoding_headers and resp.content: # html5 spec requires a meta-charset in the first 1024 bytes _sample = safe_sample(resp.content) - resp._encoding_content = get_encodings_from_content(_sample) # type: ignore[attr-defined] - if resp._encoding_content: # type: ignore[attr-defined] + resp._encoding_content = get_encodings_from_content(_sample) + if resp._encoding_content: # it's a list - resp.encoding = resp._encoding_content[0] # type: ignore[attr-defined] + resp.encoding = resp._encoding_content[0] else: - resp.encoding = resp._encoding_headers or resp._encoding_fallback # type: ignore[attr-defined] + resp.encoding = resp._encoding_headers or resp._encoding_fallback # do not return anything @@ -430,7 +456,7 @@ def is_hostname_valid( def is_parsed_valid_url( - parsed: ParseResult, + parsed: Union[ParseResult, ParseResultBytes, _ResultMixinStr], require_public_netloc: Optional[bool] = True, allow_localhosts: Optional[bool] = True, http_only: Optional[bool] = True, @@ -440,6 +466,8 @@ def is_parsed_valid_url( defaults True requires http or https for the scheme """ + if isinstance(parsed, ParseResultBytes): + parsed = parsed.decode() assert isinstance(parsed, ParseResult) if __debug__: log.debug("is_parsed_valid_url = %s", parsed) @@ -644,7 +672,7 @@ def is_url_valid( def url_to_absolute_url( - url_test: str, + url_test: Optional[str], url_fallback: Optional[str] = None, require_public_netloc: Optional[bool] = None, allow_localhosts: Optional[bool] = None, @@ -675,7 +703,7 @@ def url_to_absolute_url( # quickly correct for some dumb mistakes if isinstance(url_test, str): if url_test.lower() in ("http://", "https://"): - url_test = None # type: ignore[assignment] + url_test = None # if we don't have a test url or fallback, we can't generate an absolute if not url_test and not url_fallback: @@ -775,7 +803,7 @@ def __init__( raised: Optional[requests.exceptions.RequestException] = None, code: Optional[int] = None, metadataParser: Optional["MetadataParser"] = None, - response: Optional[requests.Response] = None, + response: Optional[TYPES_RESPONSE] = None, ): self.message = message self.raised = raised @@ -830,7 +858,7 @@ def __init__( self, location: str = "", code: Optional[int] = None, - response: Optional[requests.Response] = None, + response: Optional[TYPES_RESPONSE] = None, metadataParser: Optional["MetadataParser"] = None, ): self.location = location @@ -848,20 +876,20 @@ class DummyResponse(object): and html data """ - text: Optional[str] = None - url: Optional[str] = None - status_code: Optional[int] = None - encoding: Optional[str] = None + text: str + url: str + status_code: int + encoding: str elapsed_seconds: float = 0 - history: Optional["ResponseHistory"] = None - headers: Optional[CaseInsensitiveDict] = None + history: List + headers: CaseInsensitiveDict content: Optional[Union[str, bytes]] = None - default_encoding: Optional[str] = None + default_encoding: str def __init__( self, text: str = "", - url: str = DUMMY_URL, + url: str = "", status_code: int = 200, encoding: Optional[str] = None, elapsed_seconds: float = 0, @@ -875,6 +903,7 @@ def __init__( self.status_code = status_code self.elapsed = datetime.timedelta(0, elapsed_seconds) self.headers = headers if headers is not None else CaseInsensitiveDict() + self.history = [] self.content = content # start `encoding` block @@ -885,16 +914,13 @@ def __init__( _sample = safe_sample(text) encodings = get_encodings_from_content(_sample) if encodings: - self.encoding = encodings[0] - if default_encoding: - self.default_encoding = default_encoding + self.encoding = encoding = encodings[0] + self.default_encoding = default_encoding or ENCODING_FALLBACK # second phase cleanup - if not self.encoding: - self.encoding = self.default_encoding or ENCODING_FALLBACK + if not encoding: + self.encoding = self.default_encoding # end `encoding` block - # TODO? encode/decode html - # ------------------------------------------------------------------------------ @@ -902,7 +928,7 @@ def __init__( class ResponseHistory(object): history: Optional[Iterable] = None - def __init__(self, resp: requests.Response): + def __init__(self, resp: TYPES_RESPONSE): """ :param resp: A :class:`requests.Response` object to compute history of :type resp: class:`requests.Response` @@ -980,17 +1006,19 @@ class ParsedResult(object): readme/docs are not necessarily installed locally. """ - metadata: dict - soup: BeautifulSoup + metadata: Dict + soup: Optional[BeautifulSoup] = None response_history: Optional[ ResponseHistory ] = None # only stashing `ResponseHistory` if we have it _version: int = 1 # version tracking default_encoder: Optional[Callable] = None - og_minimum_requirements: list = ["title", "type", "image", "url"] - twitter_sections: list = ["card", "title", "site", "description"] - strategy: Union[list, str] = ["og", "dc", "meta", "page", "twitter"] + og_minimum_requirements: List = ["title", "type", "image", "url"] + twitter_sections: List = ["card", "title", "site", "description"] + strategy: Union[List, str] = ["og", "dc", "meta", "page", "twitter"] + + _get_metadata__last_strategy: Optional[str] = None def __init__(self): self.metadata = { @@ -1016,7 +1044,7 @@ def _add_discovered( _target_container_key: str, _target_key: str, _raw_value: str, - _formatted_value: Optional[Union[str, dict]] = None, + _formatted_value: Optional[Union[str, Dict]] = None, ): """ unified function to add data. @@ -1031,7 +1059,7 @@ def _add_discovered( _target_container[_target_key] = _formatted_value else: _current_value = _target_container[_target_key] - if type(_current_value) != list: + if not isinstance(_current_value, list): if _formatted_value == _current_value: return _target_container[_target_key] = [ @@ -1042,20 +1070,34 @@ def _add_discovered( if _formatted_value not in _target_container[_target_key]: _target_container[_target_key].append(_formatted_value) - def _coerce_strategy(self, strategy: Union[list, str, None] = None): + def _coerce_validate_strategy( + self, + strategy: Union[list, str, None] = None, + ) -> Union[List, str]: """normalize a strategy into a valid option""" if strategy: - if type(strategy) is not list: + if isinstance(strategy, str): if strategy != "all": warn_user( """If `strategy` is not a `list`, it should be 'all'.""" - """This was coerced into a list, but will be enforced.""" + """This is coerced into a list, but will be enforced.""" ) - if strategy in self.strategy: - strategy = [strategy] - else: + if strategy not in self.strategy: raise ValueError("invalid strategy: %s" % strategy) - if strategy is None: + strategy = [ + strategy, + ] + elif isinstance(strategy, list): + _invalids = [] + for _candidate in strategy: + if _candidate not in self.strategy: + _invalids.append(_candidate) + if "all" in strategy: + raise ValueError('Submit "all" as a `str`, not in a `list`.') + if _invalids: + raise ValueError("invalid strategy: %s" % _invalids) + else: + # use our default list strategy = self.strategy return strategy @@ -1064,22 +1106,38 @@ def get_metadata( field: str, strategy: Union[list, str, None] = None, encoder: Optional[Callable] = None, - ): + ) -> Union[str, Dict[str, Union[str, Dict]], None]: """ - legacy. DEPRECATED. + LEGACY. DEPRECATED. DO NOT USE THIS. + + `get_metadata` looks for the field in various stores. defaults to the core strategy, though you may specify a certain item. if you search for 'all' it will return a dict of all values. This is a legacy method and is being deprecated in favor of `get_metadatas` - This method will always return a string, however it is possible that the - field contains multiple elements or even a dict if the source was dublincore. - `get_metadatas` will always return a list. + This method will always return a string for the field value, however it + is possible the field contains multiple elements or even a dict if the + source was dublincore. + + In comparison, `get_metadatas` will always return a list for the values. In the case of DC/DublinCore metadata, this will return the first 'simple' pairing (key/value - without a scheme/language) or the first element if no simple match exists. + This function will return different types depending on the input: + + if `strategy` is a single type: + `str` or `None` + + if `strategy` is a list: + `str` or `None`, with `str` being the first match + self._get_metadata__last_strategy will persist the matching strategy + + if `strategy` is "all": + `dict` of {strategy: result} + :param field: The field to retrieve :type field: str @@ -1100,20 +1158,21 @@ def get_metadata( """`get_metadata` returns a string and is being deprecated""" """in favor of `get_metadatas` which returns a list.""" ) - strategy = self._coerce_strategy(strategy) + strategy = self._coerce_validate_strategy(strategy) + self._get_metadata__last_strategy = None if encoder is None: encoder = self.default_encoder elif encoder == "raw": encoder = None - def _lookup(store): + def _lookup(store: str) -> Optional[Union[str, Dict]]: if field in self.metadata[store]: val = self.metadata[store][field] if store == "dc": # dublincore will be different. it uses dicts by default # this is a one-element match - if type(val) == dict: + if isinstance(val, dict): val = val["content"] else: _opt = None @@ -1125,23 +1184,24 @@ def _lookup(store): _opt = val[0]["content"] val = _opt else: - if type(val) == list: + if isinstance(val, list): val = val[0] if encoder: val = encoder(val) return val return None - # `_coerce_strategy` ensured a compliant strategy - if type(strategy) is list: + # `_coerce_validate_strategy` ensured a compliant strategy + if isinstance(strategy, list): for store in strategy: if store in self.metadata: val = _lookup(store) if val is not None: + self._get_metadata__last_strategy = store return val return None elif strategy == "all": - rval = {} + rval: Dict = {} for store in self.metadata: if store == "_v": continue @@ -1149,15 +1209,15 @@ def _lookup(store): val = _lookup(store) rval[store] = val return rval - - return None + else: + raise ValueError("unsupported strategy") def get_metadatas( self, - field, + field: str, strategy: Union[list, str, None] = None, encoder: Optional[Callable] = None, - ): + ) -> Optional[Union[Dict, List]]: """ looks for the field in various stores. defaults to the core strategy, though you may specify a certain item. if you search for @@ -1182,25 +1242,28 @@ def get_metadatas( :type encoder: function or "raw" """ - strategy = self._coerce_strategy(strategy) + strategy = self._coerce_validate_strategy(strategy) if encoder is None: encoder = self.default_encoder elif encoder == "raw": encoder = None - def _lookup(store): + def _lookup(store: str) -> Optional[List]: if field in self.metadata[store]: val = self.metadata[store][field] - if type(val) != list: - val = [val] + if not isinstance(val, list): + val = [ + val, + ] if encoder: val = [encoder(v) for v in val] return val return None - # `_coerce_strategy` ensured a compliant strategy - if type(strategy) is list: + # `_coerce_validate_strategy` ensured a compliant strategy + if isinstance(strategy, list): + # returns List or None for store in strategy: if store in self.metadata: val = _lookup(store) @@ -1208,7 +1271,8 @@ def _lookup(store): return val return None elif strategy == "all": - rval = {} + # returns Dict or None + rval: Dict = {} for store in self.metadata: if store == "_v": continue @@ -1216,10 +1280,10 @@ def _lookup(store): val = _lookup(store) rval[store] = val return rval + else: + raise ValueError("unsupported strategy") - return None - - def is_opengraph_minimum(self): + def is_opengraph_minimum(self) -> bool: """ returns true/false if the page has the minimum amount of opengraph tags """ @@ -1292,7 +1356,7 @@ class MetadataParser(object): require_public_netloc = None force_doctype = None requests_timeout = None - peername = None + peername: Optional[TYPES_PEERNAME] = None is_redirect = None is_redirect_unique = None is_redirect_same_host = None @@ -1450,7 +1514,7 @@ def __init__( cached_urlparser = UrlParserCacheable() # a cache self._cached_urlparser = cached_urlparser # stash it self.urlparse = cached_urlparser.urlparse - elif type(cached_urlparser) is int: + elif isinstance(cached_urlparser, int): cached_urlparser = UrlParserCacheable( maxitems=cached_urlparser ) # a cache @@ -1467,7 +1531,7 @@ def __init__( self.ssl_verify = ssl_verify self.force_doctype = force_doctype self.response = None - self.response_headers: dict = {} + self.response_headers: Dict = {} self.require_public_netloc = require_public_netloc self.allow_localhosts = allow_localhosts self.requests_timeout = requests_timeout @@ -1514,7 +1578,7 @@ def __init__( if url: if defer_fetch: - def deferred_fetch(): + def deferred_fetch() -> None: (html, html_encoding, _response_history) = self.fetch_url( url_data=url_data, url_headers=url_headers, @@ -1552,30 +1616,45 @@ def deferred_fetch(): @property def metadata(self): # deprecating in 1.0 + warn_future( + "MetadataParser.metadata is deprecated in 1.0; Operate on the parsed result directly." + ) return self.parsed_result.metadata @property def metadata_version(self): # deprecating in 1.0 + warn_future( + "MetadataParser.metadata_version is deprecated in 1.0; Operate on the parsed result directly." + ) return self.parsed_result.metadata_version @property def metadata_encoding(self): # deprecating in 1.0 + warn_future( + "MetadataParser.metadata_encoding is deprecated in 1.0; Operate on the parsed result directly." + ) return self.parsed_result.metadata_encoding @property def soup(self): # deprecating in 1.0 + warn_future( + "MetadataParser.soup is deprecated in 1.0; Operate on the parsed result directly." + ) return self.parsed_result.soup def get_metadata( self, - field, + field: str, strategy: Union[list, str, None] = None, encoder: Optional[Callable] = None, - ): + ) -> Union[str, Dict[str, Union[str, Dict]], None]: # deprecating in 1.0; operate on the result instead + warn_future( + "MetadataParser.get_metadata is deprecated in 1.0; Operate on the parsed result directly." + ) return self.parsed_result.get_metadata( field, strategy=strategy, encoder=encoder ) @@ -1585,14 +1664,20 @@ def get_metadatas( field, strategy: Union[list, str, None] = None, encoder: Optional[Callable] = None, - ): + ) -> Optional[Union[Dict, List]]: # deprecating in 1.0; operate on the result instead + warn_future( + "MetadataParser.get_metadatas is deprecated in 1.0; Operate on the parsed result directly." + ) return self.parsed_result.get_metadatas( field, strategy=strategy, encoder=encoder ) - def is_opengraph_minimum(self): + def is_opengraph_minimum(self) -> bool: # deprecating in 1.0 + warn_future( + "MetadataParser.is_opengraph_minimum is deprecated in 1.0; Operate on the parsed result directly." + ) return self.parsed_result.is_opengraph_minimum() # -------------------------------------------------------------------------- @@ -1603,7 +1688,7 @@ def deferred_fetch(self): # -------------------------------------------------------------------------- - def _response_encoding(self): + def _response_encoding(self) -> Optional[str]: if self.response: return self.response.encoding return self.default_encoding or ENCODING_FALLBACK @@ -1622,7 +1707,7 @@ def fetch_url( derive_encoding=None, default_encoding=None, retry_dropped_without_headers=None, - ): + ) -> TYPE_URL_FETCH: """ fetches the url and returns a tuple of (html, html_encoding). this was busted out so you could subclass. @@ -1701,6 +1786,7 @@ def fetch_url( # if someone does usertracking with sharethis.com, they get a hashbang # like this: http://example.com/page#.UHeGb2nuVo8 # that fucks things up. + assert self.url url = self.url.split("#")[0] # scoping for return values @@ -1725,7 +1811,7 @@ def fetch_url( derive_encoding if derive_encoding is not None else self.derive_encoding ) - def _run_in_session(_requests_session): + def _run_in_session(_requests_session: requests.Session): """ perform the http(s) fetching in a nested function this allows us to use a context-manager for generated sessions @@ -1885,6 +1971,7 @@ def _run_in_session(_requests_session): if hasattr(error, "response") and (error.response is not None): self.response = error.response try: + assert self.response is not None # mypy self.peername = get_response_peername(self.response) if self.response.history: self.is_redirect = True @@ -1904,7 +1991,7 @@ def _run_in_session(_requests_session): return (html, html_encoding, response_history) - def absolute_url(self, link=None): + def absolute_url(self, link: Optional[str] = None) -> Optional[str]: """ makes the url of a submitted `link` absolute, as sometimes people use a relative url. sigh. @@ -1920,7 +2007,7 @@ def absolute_url(self, link=None): urlparser=self.urlparse, ) - def make_soup(self, html, **kwargs_bs): + def make_soup(self, html: str, **kwargs_bs) -> BeautifulSoup: """ Turns an HTML string into a BeautifulSoup document. @@ -1954,7 +2041,7 @@ def parse( html_encoding: Optional[str] = None, support_malformed: Optional[bool] = None, response_history: Optional[ResponseHistory] = None, - ): + ) -> None: """ parses submitted `html` @@ -1977,7 +2064,7 @@ def parse( self.parsed_result.response_history = response_history if not isinstance(html, BeautifulSoup): - kwargs_bs: dict = {} + kwargs_bs: Dict = {} # todo: use html_encoding if self.force_doctype: html = RE_doctype.sub("", html) @@ -2206,9 +2293,8 @@ def parse( if __debug__: if TESTING: pprint.pprint(self.parsed_result.__dict__) - # pdb.set_trace() - def get_url_scheme(self): + def get_url_scheme(self) -> Optional[str]: """try to determine the scheme""" candidate = self.url_actual or None if candidate is None: @@ -2219,7 +2305,11 @@ def get_url_scheme(self): return parsed.scheme return None - def upgrade_schemeless_url(self, url: str, field: Optional[str] = None): + def upgrade_schemeless_url( + self, + url: str, + field: Optional[str] = None, + ) -> str: """ urls can appear in html 3 ways: @@ -2246,7 +2336,7 @@ def get_fallback_url( self, require_public_netloc: bool = True, allow_localhosts: bool = True, - ): + ) -> Optional[str]: for _fallback_candndiate in (self.url_actual, self.url): if not _fallback_candndiate: continue @@ -2270,7 +2360,7 @@ def get_url_canonical( require_public_global: bool = True, url_fallback: Optional[str] = None, allow_unicode_url: bool = True, - ): + ) -> Optional[str]: """ this was originally part of `get_discrete_url` @@ -2280,6 +2370,7 @@ def get_url_canonical( allow_unicode_url=True """ _candidates = self.parsed_result.get_metadatas("canonical", strategy=["page"]) + # get_metadatas returns a list, so find the first canonical item _candidates = [c for c in _candidates if c] if _candidates else [] if not _candidates: @@ -2344,7 +2435,7 @@ def get_url_opengraph( require_public_global: bool = True, url_fallback: Optional[str] = None, allow_unicode_url: bool = True, - ): + ) -> Optional[str]: """ this was originally part of `get_discrete_url` @@ -2420,7 +2511,7 @@ def get_discrete_url( canonical_first: bool = False, require_public_global: bool = True, allow_unicode_url: bool = True, - ): + ) -> Optional[str]: """ convenience method. if `require_public_global` is True, it will try to correct @@ -2476,7 +2567,7 @@ def get_metadata_link( strategy: Union[list, str, None] = None, allow_encoded_uri: bool = False, require_public_global: bool = True, - ): + ) -> Optional[str]: """sometimes links are bad; this tries to fix them. most useful for meta images args: diff --git a/tests/test_document_parsing.py b/tests/test_document_parsing.py index 10308c9..a1e5d23 100644 --- a/tests/test_document_parsing.py +++ b/tests/test_document_parsing.py @@ -1,5 +1,6 @@ # stdlib import os +from typing import Dict import unittest # local @@ -20,7 +21,7 @@ doc_base = """
%(head)s""" -docs = { +docs: Dict = { "good-canonical-absolute": { "url-real": """http://example.com""", "head": { @@ -147,7 +148,7 @@ def encoder_capitalizer(decoded): - if type(decoded) == dict: + if type(decoded) is dict: return {k.upper(): v.upper() for k, v in decoded.items()} return decoded.upper() @@ -524,11 +525,15 @@ def test_encoding_fallback(self): """this tests simple.html to have certain fields""" html = """body""" parsed = metadata_parser.MetadataParser(url=None, html=html) + # typing scope + assert parsed.response is not None self.assertEqual(parsed.response.encoding, "ISO-8859-1") def test_encoding_declared(self): html = """body""" parsed = metadata_parser.MetadataParser(url=None, html=html) + # typing scope + assert parsed.response is not None self.assertEqual(parsed.response.encoding, "UTF-8") def test_complex_html(self): @@ -560,6 +565,7 @@ def test_complex_html(self): _computed_link = parsed.get_metadata_link("image", strategy=["og"]) assert _computed_link == "https://www.example.com/meta/property=og:image" _all_og_images = parsed.get_metadatas("og:image") + assert _all_og_images is not None assert len(_all_og_images) == 3 assert "https://www.example.com/meta/property=og:image" in _all_og_images # bs4 cleans up the ampersand internally into an entity, but prints it deserialized by default @@ -1187,6 +1193,82 @@ def test_complex_html(self): parsed.get_metadata_link("thumbnail-3", strategy=["meta"]), None ) + # this should error! + with self.assertRaises(ValueError) as cm: + parsed.get_metadatas("canonical", strategy=["all"]) + self.assertEqual( + cm.exception.args[0], 'Submit "all" as a `str`, not in a `list`.' + ) + + # ok, now test the return types + # some behavior was changed in the .7 release + + # get_metadata - single section + self.assertEqual( + parsed.get_metadata("canonical", strategy=["page"]), + "http://example.com/meta/rel=canonical", + ) + self.assertEqual(parsed.get_metadata("canonical", strategy=["meta"]), None) + self.assertEqual( + parsed.get_metadata("canonical", strategy="all"), + {"page": "http://example.com/meta/rel=canonical"}, + ) + + # get_metadatas - single section + self.assertEqual( + parsed.get_metadatas("canonical", strategy=["page"]), + [ + "http://example.com/meta/rel=canonical", + ], + ) + self.assertEqual(parsed.get_metadatas("canonical", strategy=["meta"]), None) + self.assertEqual( + parsed.get_metadatas("canonical", strategy="all"), + { + "page": [ + "http://example.com/meta/rel=canonical", + ] + }, + ) + + # get_metadata - multiple section + self.assertEqual( + parsed.get_metadata("description", strategy=["meta"]), "meta.description" + ) + self.assertEqual( + parsed.get_metadata("description", strategy="all"), + { + "og": "meta.property=og:description", + "meta": "meta.description", + "twitter": "meta.name=twitter:description", + }, + ) + # get_metadatas - multiple section + self.assertEqual( + parsed.get_metadatas("description", strategy=["meta"]), ["meta.description"] + ) + self.assertEqual( + parsed.get_metadatas("description", strategy="all"), + { + "og": [ + "meta.property=og:description", + ], + "meta": [ + "meta.description", + ], + "twitter": ["meta.name=twitter:description"], + }, + ) + + # multiple candidates! + self.assertEqual( + parsed.get_metadata("keywords", strategy=["meta"]), "meta.keywords:1" + ) + self.assertEqual( + parsed.get_metadatas("keywords", strategy=["meta"]), + ["meta.keywords:1", "meta.keywords:2"], + ) + def test_malformed_twitter(self): """ this tests simple.html to have certain fields diff --git a/tests/test_sessions.py b/tests/test_sessions.py index f77304e..d6812be 100644 --- a/tests/test_sessions.py +++ b/tests/test_sessions.py @@ -1,4 +1,5 @@ # stdlib +from typing import Optional import unittest # pypi @@ -74,10 +75,14 @@ def test_custom_session(self): num_redirects = 4 url = self.httpbin_server.url + "/redirect/%s" % num_redirects with SessionRedirect() as s: + page: Optional[metadata_parser.MetadataParser] try: page = metadata_parser.MetadataParser(url=url, requests_session=s) except metadata_parser.NotParsableJson as e: page = e.metadataParser + # typing scope + assert page is not None + assert page.response is not None # we end on get self.assertEqual(page.response.url, self.httpbin_server.url + "/get") # the session should have checked the following responses: redirects + final diff --git a/tests/test_url_parsing.py b/tests/test_url_parsing.py index 62aaea4..0fcdcbe 100644 --- a/tests/test_url_parsing.py +++ b/tests/test_url_parsing.py @@ -2,6 +2,8 @@ # stdlib import unittest +from urllib.parse import ParseResult +from urllib.parse import ParseResultBytes from urllib.parse import urlparse # local @@ -454,3 +456,23 @@ def test_canonical_first__good(self): parsed_url = parsed.get_discrete_url( # noqa: F841 og_first=False, canonical_first=True ) + + +class TestCommands(unittest.TestCase, _DocumentCanonicalsMixin): + """ + python -m unittest tests.url_parsing.TestCommands + """ + + def test_is_parsed_valid_url__string(self): + url = "https://example.com/A.html" + parsed = urlparse(url) + self.assertIsInstance(parsed, ParseResult) + is_valid = metadata_parser.is_parsed_valid_url(parsed) + self.assertTrue(is_valid) + + def test_is_parsed_valid_url__bytes(self): + url = b"https://example.com/A.html" + parsed = urlparse(url) + self.assertIsInstance(parsed, ParseResultBytes) + is_valid = metadata_parser.is_parsed_valid_url(parsed) + self.assertTrue(is_valid) diff --git a/tox.ini b/tox.ini index f425584..9fe56c5 100644 --- a/tox.ini +++ b/tox.ini @@ -2,7 +2,7 @@ envlist = lint, mypy, - py36,py37,py38,py39,py310,py311 + py36,py37,py38,py39,py310,py311,py312 [testenv] commands =