From da1b0c1bd9554d6159f418c8b62324c85965de8a Mon Sep 17 00:00:00 2001 From: Kamil Raczycki Date: Wed, 13 Mar 2024 20:02:06 +0100 Subject: [PATCH] feat: added option to save geometry in the wkt format --- README.md | 2 + examples/command_line_interface.ipynb | 35 ++++++++ quackosm/cli.py | 11 +++ quackosm/functions.py | 18 ++++- quackosm/pbf_file_reader.py | 112 +++++++++++++++++++------- tests/base/test_cli.py | 3 + tests/base/test_pbf_file_reader.py | 7 +- 7 files changed, 153 insertions(+), 35 deletions(-) diff --git a/README.md b/README.md index c50e109..51489db 100644 --- a/README.md +++ b/README.md @@ -403,6 +403,8 @@ When the `keep_all_tags` parameter is passed while filtering by OSM tags, and ad General schema of multiple segments that are concatenated together: `pbf_file_name`\_(`osm_filter_tags_hash_part`/`nofilter`)(\_`alltags`)\_(`clipping_geometry_hash_part`/`noclip`)\_(`compact`/`exploded`)(\_`filter_osm_ids_hash_part`).geoparquet +> If the WKT mode is turned on, then the result file will be saved with a `parquet` extension and a `_wkt` suffix. + ### Memory usage DuckDB queries requiring `JOIN`, `GROUP` and `ORDER BY` operations are very memory intensive. Because of that, some steps are divided into chunks (groups) with a set number of rows per chunk. diff --git a/examples/command_line_interface.ipynb b/examples/command_line_interface.ipynb index df5ccaf..7bd8acd 100644 --- a/examples/command_line_interface.ipynb +++ b/examples/command_line_interface.ipynb @@ -452,6 +452,41 @@ " \"files/andorra_8a275d4edddd035eb6a5d8120a8b42a320b25cf93577335600faba8c2d69d85a_noclip_compact.geoparquet\"\n", ")" ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## WKT mode\n", + "\n", + "By default, QuackOSM saves parsed files in the `GeoParquet` format with the geometry in the `WKB` format.\n", + "\n", + "There is also an option to save the file as a `Parquet` file with the geometry in the `WKT` format using `--wkt-result` (or `--wkt`) parameter." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "vscode": { + "languageId": "shellscript" + } + }, + "outputs": [], + "source": [ + "! QuackOSM andorra.osm.pbf --wkt-result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "duckdb.read_parquet(\n", + " \"files/andorra_nofilter_noclip_compact_wkt.parquet\"\n", + ")" + ] } ], "metadata": { diff --git a/quackosm/cli.py b/quackosm/cli.py index c55263c..66880b0 100644 --- a/quackosm/cli.py +++ b/quackosm/cli.py @@ -500,6 +500,15 @@ def main( callback=_filter_osm_ids_callback, ), ] = None, + wkt_result: Annotated[ + bool, + typer.Option( + "--wkt-result/", + "--wkt/", + help="Whether to save the geometry as a WKT string instead of WKB blob.", + show_default=False, + ), + ] = False, silent_mode: Annotated[ bool, typer.Option( @@ -583,6 +592,7 @@ def main( else None ), filter_osm_ids=filter_osm_ids, # type: ignore + save_as_wkt=wkt_result, silent_mode=silent_mode, ) else: @@ -601,6 +611,7 @@ def main( else None ), filter_osm_ids=filter_osm_ids, # type: ignore + save_as_wkt=wkt_result, silent_mode=silent_mode, ) typer.secho(geoparquet_path, fg="green") diff --git a/quackosm/functions.py b/quackosm/functions.py index 82f0cd7..47d4500 100644 --- a/quackosm/functions.py +++ b/quackosm/functions.py @@ -28,6 +28,7 @@ def convert_pbf_to_gpq( filter_osm_ids: Optional[list[str]] = None, working_directory: Union[str, Path] = "files", osm_way_polygon_features_config: Optional[Union[OsmWayPolygonConfig, dict[str, Any]]] = None, + save_as_wkt: bool = False, silent_mode: bool = False, ) -> Path: """ @@ -69,7 +70,10 @@ def convert_pbf_to_gpq( Config used to determine which closed way features are polygons. Modifications to this config left are left for experienced OSM users. Defaults to predefined "osm_way_polygon_features.json". - silent_mode (bool): Disable progress bars. + save_as_wkt (bool): Whether to save the file with geometry in the WKT form instead of WKB. + If `True`, it will be saved as a `.parquet` file, because it won't be in the GeoParquet + standard. Defaults to `False`. + silent_mode (bool): Disable progress bars. Defaults to `False`. Returns: Path: Path to the generated GeoParquet file. @@ -229,6 +233,7 @@ def convert_pbf_to_gpq( explode_tags=explode_tags, ignore_cache=ignore_cache, filter_osm_ids=filter_osm_ids, + save_as_wkt=save_as_wkt, ) @@ -243,6 +248,7 @@ def convert_geometry_to_gpq( filter_osm_ids: Optional[list[str]] = None, working_directory: Union[str, Path] = "files", osm_way_polygon_features_config: Optional[Union[OsmWayPolygonConfig, dict[str, Any]]] = None, + save_as_wkt: bool = False, silent_mode: bool = False, ) -> Path: """ @@ -287,7 +293,10 @@ def convert_geometry_to_gpq( Config used to determine which closed way features are polygons. Modifications to this config left are left for experienced OSM users. Defaults to predefined "osm_way_polygon_features.json". - silent_mode (bool): Disable progress bars. + save_as_wkt (bool): Whether to save the file with geometry in the WKT form instead of WKB. + If `True`, it will be saved as a `.parquet` file, because it won't be in the GeoParquet + standard. Defaults to `False`. + silent_mode (bool): Disable progress bars. Defaults to `False`. Returns: Path: Path to the generated GeoParquet file. @@ -399,6 +408,7 @@ def convert_geometry_to_gpq( explode_tags=explode_tags, ignore_cache=ignore_cache, filter_osm_ids=filter_osm_ids, + save_as_wkt=save_as_wkt, ) @@ -454,7 +464,7 @@ def get_features_gdf( Config used to determine which closed way features are polygons. Modifications to this config left are left for experienced OSM users. Defaults to predefined "osm_way_polygon_features.json". - silent_mode (bool): Disable progress bars. + silent_mode (bool): Disable progress bars. Defaults to `False`. Returns: gpd.GeoDataFrame: GeoDataFrame with OSM features. @@ -634,7 +644,7 @@ def get_features_gdf_from_geometry( Config used to determine which closed way features are polygons. Modifications to this config left are left for experienced OSM users. Defaults to predefined "osm_way_polygon_features.json". - silent_mode (bool): Disable progress bars. + silent_mode (bool): Disable progress bars. Defaults to `False`. Returns: gpd.GeoDataFrame: GeoDataFrame with OSM features. diff --git a/quackosm/pbf_file_reader.py b/quackosm/pbf_file_reader.py index 685a9bf..4cf29b4 100644 --- a/quackosm/pbf_file_reader.py +++ b/quackosm/pbf_file_reader.py @@ -175,6 +175,7 @@ def convert_pbf_to_gpq( explode_tags: Optional[bool] = None, ignore_cache: bool = False, filter_osm_ids: Optional[list[str]] = None, + save_as_wkt: bool = False, ) -> Path: """ Convert PBF file to GeoParquet file. @@ -197,6 +198,9 @@ def convert_pbf_to_gpq( filter_osm_ids: (list[str], optional): List of OSM features ids to read from the file. Have to be in the form of 'node/', 'way/' or 'relation/'. Defaults to an empty list. + save_as_wkt (bool): Whether to save the file with geometry in the WKT form instead + of WKB. If `True`, it will be saved as a `.parquet` file, because it won't be + in the GeoParquet standard. Defaults to `False`. Returns: Path: Path to the generated GeoParquet file. @@ -212,11 +216,12 @@ def convert_pbf_to_gpq( try: self.encountered_query_exception = False self.connection = _set_up_duckdb_connection(tmp_dir_path=self.tmp_dir_path) - result_file_path = result_file_path or self._generate_geoparquet_result_file_path( + result_file_path = result_file_path or self._generate_result_file_path( pbf_path, filter_osm_ids=filter_osm_ids, keep_all_tags=keep_all_tags, explode_tags=explode_tags, + save_as_wkt=save_as_wkt, ) parsed_geoparquet_file = self._parse_pbf_file( pbf_path=pbf_path, @@ -225,6 +230,7 @@ def convert_pbf_to_gpq( keep_all_tags=keep_all_tags, explode_tags=explode_tags, ignore_cache=ignore_cache, + save_as_wkt=save_as_wkt, ) return parsed_geoparquet_file finally: @@ -239,6 +245,7 @@ def convert_geometry_filter_to_gpq( explode_tags: Optional[bool] = None, ignore_cache: bool = False, filter_osm_ids: Optional[list[str]] = None, + save_as_wkt: bool = False, ) -> Path: """ Convert geometry to GeoParquet file. @@ -263,6 +270,9 @@ def convert_geometry_filter_to_gpq( filter_osm_ids: (list[str], optional): List of OSM features ids to read from the file. Have to be in the form of 'node/', 'way/' or 'relation/'. Defaults to an empty list. + save_as_wkt (bool): Whether to save the file with geometry in the WKT form instead + of WKB. If `True`, it will be saved as a `.parquet` file, because it won't be + in the GeoParquet standard. Defaults to `False`. Returns: Path: Path to the generated GeoParquet file. @@ -281,10 +291,11 @@ def convert_geometry_filter_to_gpq( result_file_path = Path( result_file_path - or self._generate_geoparquet_result_file_path_from_geometry( + or self._generate_result_file_path_from_geometry( filter_osm_ids=filter_osm_ids, keep_all_tags=keep_all_tags, explode_tags=explode_tags, + save_as_wkt=save_as_wkt, ) ) @@ -301,6 +312,7 @@ def convert_geometry_filter_to_gpq( explode_tags=explode_tags, ignore_cache=ignore_cache, filter_osm_ids=filter_osm_ids, + save_as_wkt=save_as_wkt, ) else: if not result_file_path.exists() or ignore_cache: @@ -317,15 +329,21 @@ def convert_geometry_filter_to_gpq( explode_tags=explode_tags, ignore_cache=ignore_cache, filter_osm_ids=filter_osm_ids, + save_as_wkt=save_as_wkt, ) parsed_geoparquet_files.append(parsed_geoparquet_file) joined_parquet_table = self._drop_duplicates_features_in_pyarrow_table( parsed_geoparquet_files ) - io.write_geoparquet_table( - joined_parquet_table, result_file_path, primary_geometry_column=GEOMETRY_COLUMN - ) + if save_as_wkt: + pq.write_table(joined_parquet_table, result_file_path) + else: + io.write_geoparquet_table( + joined_parquet_table, + result_file_path, + primary_geometry_column=GEOMETRY_COLUMN, + ) return Path(result_file_path) @@ -460,6 +478,7 @@ def _parse_pbf_file( keep_all_tags: bool = False, explode_tags: bool = True, ignore_cache: bool = False, + save_as_wkt: bool = False, ) -> Path: if not result_file_path.exists() or ignore_cache: elements = self.connection.sql(f"SELECT * FROM ST_READOSM('{Path(pbf_path)}');") @@ -546,16 +565,18 @@ def _parse_pbf_file( save_file_path=result_file_path, keep_all_tags=keep_all_tags, explode_tags=explode_tags, + save_as_wkt=save_as_wkt, ) return result_file_path - def _generate_geoparquet_result_file_path( + def _generate_result_file_path( self, pbf_file_path: Union[str, Path], keep_all_tags: bool, explode_tags: bool, filter_osm_ids: list[str], + save_as_wkt: bool, ) -> Path: pbf_file_name = Path(pbf_file_path).name.removesuffix(".osm.pbf") @@ -576,14 +597,20 @@ def _generate_geoparquet_result_file_path( h.update(json.dumps(sorted(set(filter_osm_ids))).encode()) filter_osm_ids_hash_part = f"_{h.hexdigest()}" - result_file_name = ( - f"{pbf_file_name}_{osm_filter_tags_hash_part}" - f"_{clipping_geometry_hash_part}_{exploded_tags_part}{filter_osm_ids_hash_part}.geoparquet" - ) + if save_as_wkt: + result_file_name = ( + f"{pbf_file_name}_{osm_filter_tags_hash_part}" + f"_{clipping_geometry_hash_part}_{exploded_tags_part}{filter_osm_ids_hash_part}_wkt.parquet" + ) + else: + result_file_name = ( + f"{pbf_file_name}_{osm_filter_tags_hash_part}" + f"_{clipping_geometry_hash_part}_{exploded_tags_part}{filter_osm_ids_hash_part}.geoparquet" + ) return Path(self.working_directory) / result_file_name - def _generate_geoparquet_result_file_path_from_geometry( - self, keep_all_tags: bool, explode_tags: bool, filter_osm_ids: list[str] + def _generate_result_file_path_from_geometry( + self, keep_all_tags: bool, explode_tags: bool, filter_osm_ids: list[str], save_as_wkt: bool ) -> Path: osm_filter_tags_hash_part = "nofilter" if self.tags_filter is not None: @@ -602,10 +629,16 @@ def _generate_geoparquet_result_file_path_from_geometry( h.update(json.dumps(sorted(set(filter_osm_ids))).encode()) filter_osm_ids_hash_part = f"_{h.hexdigest()}" - result_file_name = ( - f"{clipping_geometry_hash_part}_{osm_filter_tags_hash_part}" - f"_{exploded_tags_part}{filter_osm_ids_hash_part}.geoparquet" - ) + if save_as_wkt: + result_file_name = ( + f"{clipping_geometry_hash_part}_{osm_filter_tags_hash_part}" + f"_{exploded_tags_part}{filter_osm_ids_hash_part}_wkt.parquet" + ) + else: + result_file_name = ( + f"{clipping_geometry_hash_part}_{osm_filter_tags_hash_part}" + f"_{exploded_tags_part}{filter_osm_ids_hash_part}.geoparquet" + ) return Path(self.working_directory) / result_file_name def _generate_geometry_hash(self) -> str: @@ -1847,6 +1880,7 @@ def _concatenate_results_to_geoparquet( save_file_path: Path, keep_all_tags: bool, explode_tags: bool, + save_as_wkt: bool, ) -> None: select_clauses = [ "feature_id", @@ -1886,10 +1920,16 @@ def _concatenate_results_to_geoparquet( is_empty = valid_features_parquet_table.num_rows == 0 - if not is_empty: + if not is_empty and save_as_wkt: + geometry_column = ga.as_wkt( + ga.with_crs(valid_features_parquet_table.column("geometry_wkb"), WGS84_CRS) + ) + elif not is_empty: geometry_column = ga.as_wkb( ga.with_crs(valid_features_parquet_table.column("geometry_wkb"), WGS84_CRS) ) + elif save_as_wkt: + geometry_column = ga.as_wkt(gpd.GeoSeries([], crs=WGS84_CRS)) else: geometry_column = ga.as_wkb(gpd.GeoSeries([], crs=WGS84_CRS)) @@ -1945,14 +1985,24 @@ def _concatenate_results_to_geoparquet( current_invalid_features_group_table = pq.read_table( current_invalid_features_group_path ).drop("group") - valid_geometry_column = ga.as_wkb( - ga.to_geopandas( - ga.with_crs( - current_invalid_features_group_table.column("geometry_wkb"), - WGS84_CRS, - ) - ).make_valid() - ) + if save_as_wkt: + valid_geometry_column = ga.as_wkt( + ga.to_geopandas( + ga.with_crs( + current_invalid_features_group_table.column("geometry_wkb"), + WGS84_CRS, + ) + ).make_valid() + ) + else: + valid_geometry_column = ga.as_wkb( + ga.to_geopandas( + ga.with_crs( + current_invalid_features_group_table.column("geometry_wkb"), + WGS84_CRS, + ) + ).make_valid() + ) current_invalid_features_group_table = ( current_invalid_features_group_table.append_column( @@ -1983,10 +2033,14 @@ def _concatenate_results_to_geoparquet( if empty_columns: joined_parquet_table = joined_parquet_table.drop(empty_columns) - with TaskProgressSpinner("Saving final geoparquet file", "32", self.silent_mode): - io.write_geoparquet_table( - joined_parquet_table, save_file_path, primary_geometry_column=GEOMETRY_COLUMN - ) + if save_as_wkt: + with TaskProgressSpinner("Saving final parquet file", "32", self.silent_mode): + pq.write_table(joined_parquet_table, save_file_path) + else: + with TaskProgressSpinner("Saving final geoparquet file", "32", self.silent_mode): + io.write_geoparquet_table( + joined_parquet_table, save_file_path, primary_geometry_column=GEOMETRY_COLUMN + ) def _generate_osm_tags_sql_select( self, parsed_geometries: "duckdb.DuckDBPyRelation", keep_all_tags: bool, explode_tags: bool diff --git a/tests/base/test_cli.py b/tests/base/test_cli.py index 2d3e44c..de2cd14 100644 --- a/tests/base/test_cli.py +++ b/tests/base/test_cli.py @@ -93,6 +93,7 @@ def test_silent_mode(monaco_pbf_file_path: str) -> None: assert result.exit_code == 0 assert str(Path("files/monaco_nofilter_noclip_compact.geoparquet")) == result.stdout.strip() + @P.parameters("args", "expected_result") # type: ignore @P.case( "Explode", @@ -264,6 +265,8 @@ def test_silent_mode(monaco_pbf_file_path: str) -> None: ["--osm-way-polygon-config", osm_way_config_file_path()], "files/monaco_nofilter_noclip_compact.geoparquet", ) # type: ignore +@P.case("WKT", ["--wkt-result"], "files/monaco_nofilter_noclip_compact_wkt.parquet") # type: ignore +@P.case("WKT short", ["--wkt"], "files/monaco_nofilter_noclip_compact_wkt.parquet") # type: ignore def test_proper_args_with_pbf( monaco_pbf_file_path: str, args: list[str], expected_result: str ) -> None: diff --git a/tests/base/test_pbf_file_reader.py b/tests/base/test_pbf_file_reader.py index cf8a861..c6e6583 100644 --- a/tests/base/test_pbf_file_reader.py +++ b/tests/base/test_pbf_file_reader.py @@ -105,18 +105,21 @@ def test_pbf_reader( @pytest.mark.parametrize("tags_filter", [None, HEX2VEC_FILTER, GEOFABRIK_LAYERS]) # type: ignore @pytest.mark.parametrize("explode_tags", [None, True, False]) # type: ignore @pytest.mark.parametrize("keep_all_tags", [True, False]) # type: ignore +@pytest.mark.parametrize("save_as_wkt", [True, False]) # type: ignore def test_pbf_to_geoparquet_parsing( tags_filter: Optional[Union[OsmTagsFilter, GroupedOsmTagsFilter]], explode_tags: Optional[bool], keep_all_tags: bool, + save_as_wkt: bool, ): """Test if pbf to geoparquet conversion works.""" pbf_file = Path(__file__).parent.parent / "test_files" / "monaco.osm.pbf" - PbfFileReader(tags_filter=tags_filter).get_features_gdf( - file_paths=pbf_file, + PbfFileReader(tags_filter=tags_filter).convert_pbf_to_gpq( + pbf_path=pbf_file, ignore_cache=True, explode_tags=explode_tags, keep_all_tags=keep_all_tags, + save_as_wkt=save_as_wkt, )