From 38fc7baa6b32db36601e54bffeaabdc46876e878 Mon Sep 17 00:00:00 2001 From: Kamil Raczycki Date: Wed, 29 May 2024 19:53:25 +0200 Subject: [PATCH] chore: change tqdm disable parameter (#113) * chore: change tqdm disable parameter * chore: add changelog entry * feat: add option to read PBF from URL * chore: add issues to changelog entries * chore: change multiprocessing spawn method * chore: add explanation comment * chore: change url detection * fix: change multiprocessing queue to manager based * fix: change cli path detection * fix: change cli example notebook * chore: add info about URL to CLI * chore: changed tests docstrings * chore: removed unnecessary time reporting * refactor: simplify parquet multiprocessing algorithm --- CHANGELOG.md | 16 ++--- examples/command_line_interface.ipynb | 37 +++++------- quackosm/_parquet_multiprocessing.py | 84 +++++++++++++++++---------- quackosm/_rich_progress.py | 2 +- quackosm/cli.py | 9 +-- quackosm/functions.py | 4 +- quackosm/osm_extracts/__init__.py | 4 +- quackosm/osm_extracts/bbbike.py | 2 +- quackosm/osm_extracts/osm_fr.py | 2 +- quackosm/pbf_file_reader.py | 24 +++++++- tests/base/test_cli.py | 9 +++ tests/base/test_pbf_file_reader.py | 9 +++ tests/base/test_url_detection.py | 24 ++++++++ 13 files changed, 151 insertions(+), 75 deletions(-) create mode 100644 tests/base/test_url_detection.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 631e250..91561b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,19 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] -### Changed - -- Added new internal parquet dataset processing logic using multiprocessing -- Refactored nodes intersection step from `ST_Intersects` in DuckDB to Shapely's `STRtree` -- `PbfFileReader`'s internal `geometry_filter` is additionally clipped by PBF extract geometry to speed up intersections -- `OsmTagsFilter` and `GroupedOsmTagsFilter` type from `dict` to `Mapping` to make it covariant - ### Added - `geoarrow-rust-core` library to the main dependencies - Test for hashing geometry filter with mixed order - Test for parquet multiprocessing logic - Test for new intersection step +- Option to pass URL directly as PBF path [#114](https://github.com/kraina-ai/quackosm/issues/114) + +### Changed + +- Added new internal parquet dataset processing logic using multiprocessing +- Refactored nodes intersection step from `ST_Intersects` in DuckDB to Shapely's `STRtree` [#112](https://github.com/kraina-ai/quackosm/issues/112) +- `PbfFileReader`'s internal `geometry_filter` is additionally clipped by PBF extract geometry to speed up intersections [#116](https://github.com/kraina-ai/quackosm/issues/116) +- `OsmTagsFilter` and `GroupedOsmTagsFilter` type from `dict` to `Mapping` to make it covariant +- Tqdm's `disable` parameter for non-TTY environments from `None` to `False` ## [0.8.1] - 2024-05-11 diff --git a/examples/command_line_interface.ipynb b/examples/command_line_interface.ipynb index 5859291..1cec903 100644 --- a/examples/command_line_interface.ipynb +++ b/examples/command_line_interface.ipynb @@ -51,20 +51,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "Let's download a small extract and test the basic usage." - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "import urllib.request\n", + "Let's download a small extract and test the basic usage.\n", "\n", - "andorra_pbf_url = \"https://download.geofabrik.de/europe/andorra-latest.osm.pbf\"\n", - "andorra_pbf_file = \"andorra.osm.pbf\"\n", - "urllib.request.urlretrieve(andorra_pbf_url, andorra_pbf_file)" + "Because we are passing an URL, QuackOSM will download it automatically and save it in the `files` directory." ] }, { @@ -77,14 +66,16 @@ }, "outputs": [], "source": [ - "! QuackOSM andorra.osm.pbf" + "! QuackOSM https://download.geofabrik.de/europe/andorra-latest.osm.pbf" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ - "Second execution of this command will immediately return a path to the previously generated file." + "Second execution of this command will immediately return a path to the previously generated file.\n", + "\n", + "Since the file is already downloaded, we can use it directly." ] }, { @@ -97,7 +88,7 @@ }, "outputs": [], "source": [ - "! QuackOSM andorra.osm.pbf" + "! QuackOSM files/andorra-latest.osm.pbf" ] }, { @@ -117,7 +108,7 @@ }, "outputs": [], "source": [ - "! QuackOSM andorra.osm.pbf --ignore-cache" + "! QuackOSM files/andorra-latest.osm.pbf --ignore-cache" ] }, { @@ -460,7 +451,7 @@ }, "outputs": [], "source": [ - "! QuackOSM andorra.osm.pbf --osm-tags-filter '{ \"building\": true }'" + "! QuackOSM files/andorra-latest.osm.pbf --osm-tags-filter '{ \"building\": true }'" ] }, { @@ -496,7 +487,7 @@ }, "outputs": [], "source": [ - "! QuackOSM andorra.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --explode --output files/andorra_filtered_exploded.parquet --silent" + "! QuackOSM files/andorra-latest.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --explode --output files/andorra_filtered_exploded.parquet --silent" ] }, { @@ -529,7 +520,7 @@ }, "outputs": [], "source": [ - "! QuackOSM andorra.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --compact --output files/andorra_filtered_compact.parquet --silent" + "! QuackOSM files/andorra-latest.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --compact --output files/andorra_filtered_compact.parquet --silent" ] }, { @@ -566,7 +557,7 @@ }, "outputs": [], "source": [ - "! QuackOSM andorra.osm.pbf --wkt-result --silent" + "! QuackOSM files/andorra-latest.osm.pbf --wkt-result --silent" ] }, { @@ -579,7 +570,7 @@ }, "outputs": [], "source": [ - "! ./duckdb :memory: \"FROM read_parquet('files/andorra_nofilter_noclip_compact_wkt.parquet')\"" + "! ./duckdb :memory: \"FROM read_parquet('files/andorra-latest_nofilter_noclip_compact_wkt.parquet')\"" ] } ], @@ -599,7 +590,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.7" + "version": "3.10.12" } }, "nbformat": 4, diff --git a/quackosm/_parquet_multiprocessing.py b/quackosm/_parquet_multiprocessing.py index 5a0ede7..5625c4a 100644 --- a/quackosm/_parquet_multiprocessing.py +++ b/quackosm/_parquet_multiprocessing.py @@ -3,13 +3,17 @@ from pathlib import Path from queue import Empty, Queue from time import sleep -from typing import Callable, Optional +from typing import Any, Callable, Optional import pyarrow as pa import pyarrow.parquet as pq from quackosm._rich_progress import TaskProgressBar # type: ignore[attr-defined] +# Using `spawn` method to enable integration with Polars and probably other Rust-based libraries +# https://docs.pola.rs/user-guide/misc/multiprocessing/ +ctx: multiprocessing.context.SpawnContext = multiprocessing.get_context("spawn") + def _job( queue: Queue[tuple[str, int]], @@ -53,13 +57,13 @@ def _job( writer.close() -class WorkerProcess(multiprocessing.Process): - def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def] +class WorkerProcess(ctx.Process): # type: ignore[name-defined,misc] + def __init__(self, *args: Any, **kwargs: Any): multiprocessing.Process.__init__(self, *args, **kwargs) self._pconn, self._cconn = multiprocessing.Pipe() self._exception: Optional[tuple[Exception, str]] = None - def run(self) -> None: # pragma: no cover + def run(self) -> None: # pragma: no cover try: multiprocessing.Process.run(self) self._cconn.send(None) @@ -95,7 +99,7 @@ def map_parquet_dataset( progress_bar (Optional[TaskProgressBar]): Progress bar to show task status. Defaults to `None`. """ - queue: Queue[tuple[str, int]] = multiprocessing.Manager().Queue() + queue: Queue[tuple[str, int]] = ctx.Manager().Queue() dataset = pq.ParquetDataset(dataset_path) @@ -112,39 +116,55 @@ def map_parquet_dataset( WorkerProcess( target=_job, args=(queue, destination_path, function, columns), - ) # type: ignore[no-untyped-call] - for _ in range(multiprocessing.cpu_count()) + ) + for _ in range(min(multiprocessing.cpu_count(), total)) ] + _run_processes(processes=processes, queue=queue, total=total, progress_bar=progress_bar) + finally: # pragma: no cover + _report_exceptions(processes=processes) - # Run processes - for p in processes: - p.start() - if progress_bar: # pragma: no cover - progress_bar.create_manual_bar(total=total) - while any(process.is_alive() for process in processes): - if any(p.exception for p in processes): # pragma: no cover - break +def _run_processes( + processes: list[WorkerProcess], + queue: Queue[tuple[str, int]], + total: int, + progress_bar: Optional[TaskProgressBar], +) -> None: + # Run processes + for p in processes: + p.start() - if progress_bar: # pragma: no cover - progress_bar.update_manual_bar(current_progress=total - queue.qsize()) - sleep(1) + if progress_bar: # pragma: no cover + progress_bar.create_manual_bar(total=total) + + sleep_time = 0.1 + while any(process.is_alive() for process in processes): + if any(p.exception for p in processes): # pragma: no cover + break if progress_bar: # pragma: no cover - progress_bar.update_manual_bar(current_progress=total) - finally: # pragma: no cover - # In case of exception - exceptions = [] - for p in processes: - if p.is_alive(): - p.terminate() - - if p.exception: - exceptions.append(p.exception) - - if exceptions: - # use ExceptionGroup in Python3.11 - _raise_multiple(exceptions) + progress_bar.update_manual_bar(current_progress=total - queue.qsize()) + + sleep(sleep_time) + sleep_time = min(1.0, sleep_time + 0.1) + + if progress_bar: # pragma: no cover + progress_bar.update_manual_bar(current_progress=total) + + +def _report_exceptions(processes: list[WorkerProcess]) -> None: + # In case of exception + exceptions = [] + for p in processes: + if p.is_alive(): + p.terminate() + + if p.exception: + exceptions.append(p.exception) + + if exceptions: + # use ExceptionGroup in Python3.11 + _raise_multiple(exceptions) def _raise_multiple(exceptions: list[tuple[Exception, str]]) -> None: diff --git a/quackosm/_rich_progress.py b/quackosm/_rich_progress.py index ddf2924..236235a 100644 --- a/quackosm/_rich_progress.py +++ b/quackosm/_rich_progress.py @@ -265,7 +265,7 @@ def stop(self): if self.console and not self.console.is_interactive: self.console.print() - if not self.verbosity_mode == "silent": + if not self.verbosity_mode == "silent" and not self.is_new(): end_time = time.time() elapsed_seconds = end_time - self.start_time show_total_elapsed_time(elapsed_seconds) diff --git a/quackosm/cli.py b/quackosm/cli.py index 2edf33f..726052e 100644 --- a/quackosm/cli.py +++ b/quackosm/cli.py @@ -23,6 +23,7 @@ from quackosm._typing import is_expected_type from quackosm.functions import convert_geometry_to_parquet, convert_pbf_to_parquet from quackosm.osm_extracts import OsmExtractSource +from quackosm.pbf_file_reader import _is_url_path app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]}, rich_markup_mode="rich") @@ -34,7 +35,7 @@ def _version_callback(value: bool) -> None: def _path_callback(ctx: typer.Context, value: Path) -> Path: - if not Path(value).exists(): + if not _is_url_path(value) and not Path(value).exists(): raise typer.BadParameter(f"File not found error: {value}") return value @@ -235,10 +236,10 @@ def _filter_osm_ids_callback(value: str) -> Optional[list[str]]: @app.command() # type: ignore def main( pbf_file: Annotated[ - Optional[Path], + Optional[str], typer.Argument( - help="PBF file to convert into GeoParquet", - metavar="PBF file path", + help="PBF file to convert into GeoParquet. Can be an URL.", + metavar="PBF file path.", callback=_empty_path_callback, ), ] = None, diff --git a/quackosm/functions.py b/quackosm/functions.py index af22ac9..373116a 100644 --- a/quackosm/functions.py +++ b/quackosm/functions.py @@ -46,7 +46,7 @@ def convert_pbf_to_parquet( Args: pbf_path (Union[str, Path, Iterable[Union[str, Path]]]): - Path or list of paths of `*.osm.pbf` files to be parsed. + Path or list of paths of `*.osm.pbf` files to be parsed. Can be an URL. tags_filter (Union[OsmTagsFilter, GroupedOsmTagsFilter], optional): A dictionary specifying which tags to download. The keys should be OSM tags (e.g. `building`, `amenity`). @@ -472,7 +472,7 @@ def convert_pbf_to_geodataframe( Args: pbf_path (Union[str, Path, Iterable[Union[str, Path]]]): - Path or list of paths of `*.osm.pbf` files to be parsed. + Path or list of paths of `*.osm.pbf` files to be parsed. Can be an URL. tags_filter (Union[OsmTagsFilter, GroupedOsmTagsFilter], optional): A dictionary specifying which tags to download. The keys should be OSM tags (e.g. `building`, `amenity`). diff --git a/quackosm/osm_extracts/__init__.py b/quackosm/osm_extracts/__init__.py index e0fd849..13b51f3 100644 --- a/quackosm/osm_extracts/__init__.py +++ b/quackosm/osm_extracts/__init__.py @@ -270,7 +270,7 @@ def _find_smallest_containing_extracts( desc="Finding matching extracts", max_workers=num_of_multiprocessing_workers, chunksize=ceil(total_polygons / (4 * num_of_multiprocessing_workers)), - disable=True if force_terminal else None, + disable=True if force_terminal else False, ): unique_extracts_ids.update(extract_ids_list) else: @@ -414,7 +414,7 @@ def _filter_extracts( desc="Filtering extracts", max_workers=num_of_multiprocessing_workers, chunksize=ceil(total_geometries / (4 * num_of_multiprocessing_workers)), - disable=True if force_terminal else None, + disable=True if force_terminal else False, ): filtered_extracts_ids.update(extract_ids_list) else: diff --git a/quackosm/osm_extracts/bbbike.py b/quackosm/osm_extracts/bbbike.py index ba17dd4..e2e2e0c 100644 --- a/quackosm/osm_extracts/bbbike.py +++ b/quackosm/osm_extracts/bbbike.py @@ -82,7 +82,7 @@ def _iterate_bbbike_index() -> list[OpenStreetMapExtract]: # pragma: no cover force_terminal = os.getenv("FORCE_TERMINAL_MODE", "false").lower() == "true" for extract_name in tqdm( - extract_names, desc="Iterating BBBike index", disable=True if force_terminal else None + extract_names, desc="Iterating BBBike index", disable=True if force_terminal else False ): poly_url = f"{BBBIKE_EXTRACTS_INDEX_URL}/{extract_name}/{extract_name}.poly" polygon = parse_polygon_file(poly_url) diff --git a/quackosm/osm_extracts/osm_fr.py b/quackosm/osm_extracts/osm_fr.py index 569faa7..a99b304 100644 --- a/quackosm/osm_extracts/osm_fr.py +++ b/quackosm/osm_extracts/osm_fr.py @@ -48,7 +48,7 @@ def _load_openstreetmap_fr_index() -> gpd.GeoDataFrame: else: # pragma: no cover force_terminal = os.getenv("FORCE_TERMINAL_MODE", "false").lower() == "true" extracts = [] - with tqdm(disable=True if force_terminal else None) as pbar: + with tqdm(disable=True if force_terminal else False) as pbar: extract_soup_objects = _gather_all_openstreetmap_fr_urls("osm_fr", "/", pbar) pbar.set_description("osm_fr") for soup_object, id_prefix, directory_url in extract_soup_objects: diff --git a/quackosm/pbf_file_reader.py b/quackosm/pbf_file_reader.py index fed625f..7060faa 100644 --- a/quackosm/pbf_file_reader.py +++ b/quackosm/pbf_file_reader.py @@ -29,6 +29,8 @@ import shapely.wkt as wktlib from geoarrow.pyarrow import io from pandas.util._decorators import deprecate, deprecate_kwarg +from pooch import retrieve +from pooch.utils import parse_url from pyarrow_ops import drop_duplicates from shapely.geometry import LinearRing, Polygon from shapely.geometry.base import BaseGeometry, BaseMultipartGeometry @@ -250,7 +252,7 @@ def convert_pbf_to_parquet( Args: pbf_path (Union[str, Path, Iterable[Union[str, Path]]]): - Path or list of paths of `*.osm.pbf` files to be parsed. + Path or list of paths of `*.osm.pbf` files to be parsed. Can be an URL. result_file_path (Union[str, Path], optional): Where to save the geoparquet file. If not provided, will be generated based on hashes from provided tags filter and geometry filter. Defaults to `None`. @@ -597,7 +599,7 @@ def convert_pbf_to_geodataframe( Args: pbf_path (Union[str, Path, Iterable[Union[str, Path]]]): - Path or list of paths of `*.osm.pbf` files to be parsed. + Path or list of paths of `*.osm.pbf` files to be parsed. Can be an URL. keep_all_tags (bool, optional): Works only with the `tags_filter` parameter. Whether to keep all tags related to the element, or return only those defined in the `tags_filter`. When `True`, will override the optional grouping defined @@ -776,6 +778,15 @@ def _parse_pbf_file( ignore_cache: bool = False, save_as_wkt: bool = False, ) -> Path: + if _is_url_path(pbf_path): + pbf_path = retrieve( + pbf_path, + fname=Path(pbf_path).name, + path=self.working_directory, + progressbar=True, + known_hash=None, + ) + if result_file_path.exists() and not ignore_cache: return result_file_path elif result_file_path.with_suffix(".geoparquet").exists() and not ignore_cache: @@ -2620,3 +2631,12 @@ def _group_ways_with_polars(current_ways_group_path: Path, current_destination_p ).write_parquet( current_destination_path ) + + +def _is_url_path(path: Union[str, Path]) -> bool: + # schemes known to pooch library + known_schemes = {"ftp", "https", "http", "sftp", "doi"} + parsed_url = parse_url(str(path)) + if parsed_url["protocol"] in known_schemes: + return True + return False diff --git a/tests/base/test_cli.py b/tests/base/test_cli.py index d2f41cf..a611a16 100644 --- a/tests/base/test_cli.py +++ b/tests/base/test_cli.py @@ -525,6 +525,15 @@ def test_proper_args_without_pbf(args: list[str], expected_result: str) -> None: assert str(Path(expected_result)) in result.stdout +def test_proper_args_with_pbf_url() -> None: + """Test if runs properly with an url path.""" + result = runner.invoke(cli.app, ["https://download.geofabrik.de/europe/monaco-latest.osm.pbf"]) + print(result.stdout) + + assert result.exit_code == 0 + assert str(Path("files/monaco-latest_nofilter_noclip_compact.parquet")) in result.stdout + + @P.parameters("args") # type: ignore @P.case( "OSM tags filter malfunctioned JSON", diff --git a/tests/base/test_pbf_file_reader.py b/tests/base/test_pbf_file_reader.py index 710070c..b2c4df6 100644 --- a/tests/base/test_pbf_file_reader.py +++ b/tests/base/test_pbf_file_reader.py @@ -81,6 +81,15 @@ def test_pbf_to_geoparquet_parsing( ) +def test_pbf_reader_url_path(): # type: ignore + """Test proper URL detection in `PbfFileReader`.""" + file_name = "https://download.geofabrik.de/europe/monaco-latest.osm.pbf" + features_gdf = PbfFileReader().convert_pbf_to_geodataframe( + pbf_path=file_name, explode_tags=True, ignore_cache=True + ) + assert len(features_gdf) > 0 + + def test_pbf_reader_geometry_filtering(): # type: ignore """Test proper spatial data filtering in `PbfFileReader`.""" file_name = "d17f922ed15e9609013a6b895e1e7af2d49158f03586f2c675d17b760af3452e.osm.pbf" diff --git a/tests/base/test_url_detection.py b/tests/base/test_url_detection.py new file mode 100644 index 0000000..fb3b857 --- /dev/null +++ b/tests/base/test_url_detection.py @@ -0,0 +1,24 @@ +"""Tests for proper URL detection.""" + +import pytest + +from quackosm.pbf_file_reader import _is_url_path + + +@pytest.mark.parametrize( + "path,is_url", + [ + ("D:/a/quackosm/files/seychelles.osm.pbf", False), + ("http://127.0.0.1:8080/seychelles.osm.pbf", True), + ("ftp://127.0.0.1:8080/seychelles.osm.pbf", True), + ("/dev/files/seychelles.osm.pbf", False), + ("files/seychelles.osm.pbf", False), + ("/files/seychelles.osm.pbf", False), + ("./files/seychelles.osm.pbf", False), + ("../files/seychelles.osm.pbf", False), + ("http://download.geofabrik.de/africa/seychelles-latest.osm.pbf", True), + ], +) # type: ignore +def test_url_parsing(path: str, is_url: bool) -> None: + """Test if URL detection works.""" + assert _is_url_path(path) == is_url