Skip to content

Commit

Permalink
chore: change tqdm disable parameter (#113)
Browse files Browse the repository at this point in the history
* chore: change tqdm disable parameter

* chore: add changelog entry

* feat: add option to read PBF from URL

* chore: add issues to changelog entries

* chore: change multiprocessing spawn method

* chore: add explanation comment

* chore: change url detection

* fix: change multiprocessing queue to manager based

* fix: change cli path detection

* fix: change cli example notebook

* chore: add info about URL to CLI

* chore: changed tests docstrings

* chore: removed unnecessary time reporting

* refactor: simplify parquet multiprocessing algorithm
  • Loading branch information
RaczeQ authored May 29, 2024
1 parent 8018c5f commit 38fc7ba
Show file tree
Hide file tree
Showing 13 changed files with 151 additions and 75 deletions.
16 changes: 9 additions & 7 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Changed

- Added new internal parquet dataset processing logic using multiprocessing
- Refactored nodes intersection step from `ST_Intersects` in DuckDB to Shapely's `STRtree`
- `PbfFileReader`'s internal `geometry_filter` is additionally clipped by PBF extract geometry to speed up intersections
- `OsmTagsFilter` and `GroupedOsmTagsFilter` type from `dict` to `Mapping` to make it covariant

### Added

- `geoarrow-rust-core` library to the main dependencies
- Test for hashing geometry filter with mixed order
- Test for parquet multiprocessing logic
- Test for new intersection step
- Option to pass URL directly as PBF path [#114](https://github.com/kraina-ai/quackosm/issues/114)

### Changed

- Added new internal parquet dataset processing logic using multiprocessing
- Refactored nodes intersection step from `ST_Intersects` in DuckDB to Shapely's `STRtree` [#112](https://github.com/kraina-ai/quackosm/issues/112)
- `PbfFileReader`'s internal `geometry_filter` is additionally clipped by PBF extract geometry to speed up intersections [#116](https://github.com/kraina-ai/quackosm/issues/116)
- `OsmTagsFilter` and `GroupedOsmTagsFilter` type from `dict` to `Mapping` to make it covariant
- Tqdm's `disable` parameter for non-TTY environments from `None` to `False`

## [0.8.1] - 2024-05-11

Expand Down
37 changes: 14 additions & 23 deletions examples/command_line_interface.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -51,20 +51,9 @@
"cell_type": "markdown",
"metadata": {},
"source": [
"Let's download a small extract and test the basic usage."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import urllib.request\n",
"Let's download a small extract and test the basic usage.\n",
"\n",
"andorra_pbf_url = \"https://download.geofabrik.de/europe/andorra-latest.osm.pbf\"\n",
"andorra_pbf_file = \"andorra.osm.pbf\"\n",
"urllib.request.urlretrieve(andorra_pbf_url, andorra_pbf_file)"
"Because we are passing an URL, QuackOSM will download it automatically and save it in the `files` directory."
]
},
{
Expand All @@ -77,14 +66,16 @@
},
"outputs": [],
"source": [
"! QuackOSM andorra.osm.pbf"
"! QuackOSM https://download.geofabrik.de/europe/andorra-latest.osm.pbf"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Second execution of this command will immediately return a path to the previously generated file."
"Second execution of this command will immediately return a path to the previously generated file.\n",
"\n",
"Since the file is already downloaded, we can use it directly."
]
},
{
Expand All @@ -97,7 +88,7 @@
},
"outputs": [],
"source": [
"! QuackOSM andorra.osm.pbf"
"! QuackOSM files/andorra-latest.osm.pbf"
]
},
{
Expand All @@ -117,7 +108,7 @@
},
"outputs": [],
"source": [
"! QuackOSM andorra.osm.pbf --ignore-cache"
"! QuackOSM files/andorra-latest.osm.pbf --ignore-cache"
]
},
{
Expand Down Expand Up @@ -460,7 +451,7 @@
},
"outputs": [],
"source": [
"! QuackOSM andorra.osm.pbf --osm-tags-filter '{ \"building\": true }'"
"! QuackOSM files/andorra-latest.osm.pbf --osm-tags-filter '{ \"building\": true }'"
]
},
{
Expand Down Expand Up @@ -496,7 +487,7 @@
},
"outputs": [],
"source": [
"! QuackOSM andorra.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --explode --output files/andorra_filtered_exploded.parquet --silent"
"! QuackOSM files/andorra-latest.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --explode --output files/andorra_filtered_exploded.parquet --silent"
]
},
{
Expand Down Expand Up @@ -529,7 +520,7 @@
},
"outputs": [],
"source": [
"! QuackOSM andorra.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --compact --output files/andorra_filtered_compact.parquet --silent"
"! QuackOSM files/andorra-latest.osm.pbf --osm-tags-filter '{ \"amenity\": \"parking\", \"building\": \"office\" }' --compact --output files/andorra_filtered_compact.parquet --silent"
]
},
{
Expand Down Expand Up @@ -566,7 +557,7 @@
},
"outputs": [],
"source": [
"! QuackOSM andorra.osm.pbf --wkt-result --silent"
"! QuackOSM files/andorra-latest.osm.pbf --wkt-result --silent"
]
},
{
Expand All @@ -579,7 +570,7 @@
},
"outputs": [],
"source": [
"! ./duckdb :memory: \"FROM read_parquet('files/andorra_nofilter_noclip_compact_wkt.parquet')\""
"! ./duckdb :memory: \"FROM read_parquet('files/andorra-latest_nofilter_noclip_compact_wkt.parquet')\""
]
}
],
Expand All @@ -599,7 +590,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.7"
"version": "3.10.12"
}
},
"nbformat": 4,
Expand Down
84 changes: 52 additions & 32 deletions quackosm/_parquet_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
from pathlib import Path
from queue import Empty, Queue
from time import sleep
from typing import Callable, Optional
from typing import Any, Callable, Optional

import pyarrow as pa
import pyarrow.parquet as pq

from quackosm._rich_progress import TaskProgressBar # type: ignore[attr-defined]

# Using `spawn` method to enable integration with Polars and probably other Rust-based libraries
# https://docs.pola.rs/user-guide/misc/multiprocessing/
ctx: multiprocessing.context.SpawnContext = multiprocessing.get_context("spawn")


def _job(
queue: Queue[tuple[str, int]],
Expand Down Expand Up @@ -53,13 +57,13 @@ def _job(
writer.close()


class WorkerProcess(multiprocessing.Process):
def __init__(self, *args, **kwargs): # type: ignore[no-untyped-def]
class WorkerProcess(ctx.Process): # type: ignore[name-defined,misc]
def __init__(self, *args: Any, **kwargs: Any):
multiprocessing.Process.__init__(self, *args, **kwargs)
self._pconn, self._cconn = multiprocessing.Pipe()
self._exception: Optional[tuple[Exception, str]] = None

def run(self) -> None: # pragma: no cover
def run(self) -> None: # pragma: no cover
try:
multiprocessing.Process.run(self)
self._cconn.send(None)
Expand Down Expand Up @@ -95,7 +99,7 @@ def map_parquet_dataset(
progress_bar (Optional[TaskProgressBar]): Progress bar to show task status.
Defaults to `None`.
"""
queue: Queue[tuple[str, int]] = multiprocessing.Manager().Queue()
queue: Queue[tuple[str, int]] = ctx.Manager().Queue()

dataset = pq.ParquetDataset(dataset_path)

Expand All @@ -112,39 +116,55 @@ def map_parquet_dataset(
WorkerProcess(
target=_job,
args=(queue, destination_path, function, columns),
) # type: ignore[no-untyped-call]
for _ in range(multiprocessing.cpu_count())
)
for _ in range(min(multiprocessing.cpu_count(), total))
]
_run_processes(processes=processes, queue=queue, total=total, progress_bar=progress_bar)
finally: # pragma: no cover
_report_exceptions(processes=processes)

# Run processes
for p in processes:
p.start()

if progress_bar: # pragma: no cover
progress_bar.create_manual_bar(total=total)
while any(process.is_alive() for process in processes):
if any(p.exception for p in processes): # pragma: no cover
break
def _run_processes(
processes: list[WorkerProcess],
queue: Queue[tuple[str, int]],
total: int,
progress_bar: Optional[TaskProgressBar],
) -> None:
# Run processes
for p in processes:
p.start()

if progress_bar: # pragma: no cover
progress_bar.update_manual_bar(current_progress=total - queue.qsize())
sleep(1)
if progress_bar: # pragma: no cover
progress_bar.create_manual_bar(total=total)

sleep_time = 0.1
while any(process.is_alive() for process in processes):
if any(p.exception for p in processes): # pragma: no cover
break

if progress_bar: # pragma: no cover
progress_bar.update_manual_bar(current_progress=total)
finally: # pragma: no cover
# In case of exception
exceptions = []
for p in processes:
if p.is_alive():
p.terminate()

if p.exception:
exceptions.append(p.exception)

if exceptions:
# use ExceptionGroup in Python3.11
_raise_multiple(exceptions)
progress_bar.update_manual_bar(current_progress=total - queue.qsize())

sleep(sleep_time)
sleep_time = min(1.0, sleep_time + 0.1)

if progress_bar: # pragma: no cover
progress_bar.update_manual_bar(current_progress=total)


def _report_exceptions(processes: list[WorkerProcess]) -> None:
# In case of exception
exceptions = []
for p in processes:
if p.is_alive():
p.terminate()

if p.exception:
exceptions.append(p.exception)

if exceptions:
# use ExceptionGroup in Python3.11
_raise_multiple(exceptions)


def _raise_multiple(exceptions: list[tuple[Exception, str]]) -> None:
Expand Down
2 changes: 1 addition & 1 deletion quackosm/_rich_progress.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def stop(self):
if self.console and not self.console.is_interactive:
self.console.print()

if not self.verbosity_mode == "silent":
if not self.verbosity_mode == "silent" and not self.is_new():
end_time = time.time()
elapsed_seconds = end_time - self.start_time
show_total_elapsed_time(elapsed_seconds)
Expand Down
9 changes: 5 additions & 4 deletions quackosm/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from quackosm._typing import is_expected_type
from quackosm.functions import convert_geometry_to_parquet, convert_pbf_to_parquet
from quackosm.osm_extracts import OsmExtractSource
from quackosm.pbf_file_reader import _is_url_path

app = typer.Typer(context_settings={"help_option_names": ["-h", "--help"]}, rich_markup_mode="rich")

Expand All @@ -34,7 +35,7 @@ def _version_callback(value: bool) -> None:


def _path_callback(ctx: typer.Context, value: Path) -> Path:
if not Path(value).exists():
if not _is_url_path(value) and not Path(value).exists():
raise typer.BadParameter(f"File not found error: {value}")
return value

Expand Down Expand Up @@ -235,10 +236,10 @@ def _filter_osm_ids_callback(value: str) -> Optional[list[str]]:
@app.command() # type: ignore
def main(
pbf_file: Annotated[
Optional[Path],
Optional[str],
typer.Argument(
help="PBF file to convert into GeoParquet",
metavar="PBF file path",
help="PBF file to convert into GeoParquet. Can be an URL.",
metavar="PBF file path.",
callback=_empty_path_callback,
),
] = None,
Expand Down
4 changes: 2 additions & 2 deletions quackosm/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def convert_pbf_to_parquet(
Args:
pbf_path (Union[str, Path, Iterable[Union[str, Path]]]):
Path or list of paths of `*.osm.pbf` files to be parsed.
Path or list of paths of `*.osm.pbf` files to be parsed. Can be an URL.
tags_filter (Union[OsmTagsFilter, GroupedOsmTagsFilter], optional): A dictionary
specifying which tags to download.
The keys should be OSM tags (e.g. `building`, `amenity`).
Expand Down Expand Up @@ -472,7 +472,7 @@ def convert_pbf_to_geodataframe(
Args:
pbf_path (Union[str, Path, Iterable[Union[str, Path]]]):
Path or list of paths of `*.osm.pbf` files to be parsed.
Path or list of paths of `*.osm.pbf` files to be parsed. Can be an URL.
tags_filter (Union[OsmTagsFilter, GroupedOsmTagsFilter], optional): A dictionary
specifying which tags to download.
The keys should be OSM tags (e.g. `building`, `amenity`).
Expand Down
4 changes: 2 additions & 2 deletions quackosm/osm_extracts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ def _find_smallest_containing_extracts(
desc="Finding matching extracts",
max_workers=num_of_multiprocessing_workers,
chunksize=ceil(total_polygons / (4 * num_of_multiprocessing_workers)),
disable=True if force_terminal else None,
disable=True if force_terminal else False,
):
unique_extracts_ids.update(extract_ids_list)
else:
Expand Down Expand Up @@ -414,7 +414,7 @@ def _filter_extracts(
desc="Filtering extracts",
max_workers=num_of_multiprocessing_workers,
chunksize=ceil(total_geometries / (4 * num_of_multiprocessing_workers)),
disable=True if force_terminal else None,
disable=True if force_terminal else False,
):
filtered_extracts_ids.update(extract_ids_list)
else:
Expand Down
2 changes: 1 addition & 1 deletion quackosm/osm_extracts/bbbike.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def _iterate_bbbike_index() -> list[OpenStreetMapExtract]: # pragma: no cover

force_terminal = os.getenv("FORCE_TERMINAL_MODE", "false").lower() == "true"
for extract_name in tqdm(
extract_names, desc="Iterating BBBike index", disable=True if force_terminal else None
extract_names, desc="Iterating BBBike index", disable=True if force_terminal else False
):
poly_url = f"{BBBIKE_EXTRACTS_INDEX_URL}/{extract_name}/{extract_name}.poly"
polygon = parse_polygon_file(poly_url)
Expand Down
2 changes: 1 addition & 1 deletion quackosm/osm_extracts/osm_fr.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def _load_openstreetmap_fr_index() -> gpd.GeoDataFrame:
else: # pragma: no cover
force_terminal = os.getenv("FORCE_TERMINAL_MODE", "false").lower() == "true"
extracts = []
with tqdm(disable=True if force_terminal else None) as pbar:
with tqdm(disable=True if force_terminal else False) as pbar:
extract_soup_objects = _gather_all_openstreetmap_fr_urls("osm_fr", "/", pbar)
pbar.set_description("osm_fr")
for soup_object, id_prefix, directory_url in extract_soup_objects:
Expand Down
Loading

0 comments on commit 38fc7ba

Please sign in to comment.