Skip to content

Commit

Permalink
refactor: make cleaner usage of the grouped ways ids
Browse files Browse the repository at this point in the history
  • Loading branch information
RaczeQ committed Jan 2, 2024
1 parent 45d3adf commit 00d4ae7
Showing 1 changed file with 28 additions and 29 deletions.
57 changes: 28 additions & 29 deletions quackosm/pbf_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -911,20 +911,20 @@ def _get_filtered_ways_with_linestrings(
destination_dir_path = Path(tmp_dir_name) / "filtered_ways_with_linestrings"

with TaskProgressSpinner("Grouping filtered ways", "24"):
groups = self._group_ways(
groups = self._group_ways_ids(
ways_ids=osm_parquet_files.ways_filtered_ids,
osm_parquet_files=osm_parquet_files,
required_nodes_with_structs=required_nodes_with_structs,
grouped_ways_path=grouped_ways_path,
grouped_ways_ids_path=grouped_ways_ids_path,
destination_dir_path=destination_dir_path,
grouped_ways_ids_path=grouped_ways_ids_path,
)

with TaskProgressBar("Saving filtered ways with linestrings", "25") as bar:
self._construct_ways_linestrings(
bar=bar,
groups=groups,
osm_parquet_files=osm_parquet_files,
required_nodes_with_structs=required_nodes_with_structs,
destination_dir_path=destination_dir_path,
grouped_ways_ids_path=grouped_ways_ids_path,
grouped_ways_path=grouped_ways_path,
)

Expand All @@ -944,20 +944,20 @@ def _get_required_ways_with_linestrings(
destination_dir_path = Path(tmp_dir_name) / "required_ways_with_linestrings"

with TaskProgressSpinner("Grouping required ways", "26"):
groups = self._group_ways(
ways_ids=osm_parquet_files.ways_required_ids,
osm_parquet_files=osm_parquet_files,
required_nodes_with_structs=required_nodes_with_structs,
grouped_ways_path=grouped_ways_path,
grouped_ways_ids_path=grouped_ways_ids_path,
groups = self._group_ways_ids(
ways_ids=osm_parquet_files.ways_filtered_ids,
destination_dir_path=destination_dir_path,
grouped_ways_ids_path=grouped_ways_ids_path,
)

with TaskProgressBar("Saving required ways with linestrings", "27") as bar:
self._construct_ways_linestrings(
bar=bar,
groups=groups,
osm_parquet_files=osm_parquet_files,
required_nodes_with_structs=required_nodes_with_structs,
destination_dir_path=destination_dir_path,
grouped_ways_ids_path=grouped_ways_ids_path,
grouped_ways_path=grouped_ways_path,
)

Expand All @@ -966,13 +966,10 @@ def _get_required_ways_with_linestrings(
""")
return ways_parquet

def _group_ways(
def _group_ways_ids(
self,
ways_ids: "duckdb.DuckDBPyRelation",
osm_parquet_files: ConvertedOSMParquetFiles,
required_nodes_with_structs: "duckdb.DuckDBPyRelation",
destination_dir_path: Path,
grouped_ways_path: Path,
grouped_ways_ids_path: Path,
) -> int:
total_required_ways = ways_ids.count("id").fetchone()[0]
Expand All @@ -998,13 +995,26 @@ def _group_ways(
(FORMAT 'parquet', PARTITION_BY ("group"), ROW_GROUP_SIZE 25000)
""")

return groups

def _construct_ways_linestrings(
self,
bar: TaskProgressBar,
groups: int,
osm_parquet_files: ConvertedOSMParquetFiles,
required_nodes_with_structs: "duckdb.DuckDBPyRelation",
destination_dir_path: Path,
grouped_ways_ids_path: Path,
grouped_ways_path: Path,
) -> None:
grouped_ways_path.mkdir(parents=True, exist_ok=True)

for group in range(groups + 1):
for group in bar.track(range(groups + 1)):
current_ways_ids_group_path = grouped_ways_ids_path / f"group={group}"
current_ways_ids_group_relation = self.connection.sql(f"""
SELECT * FROM read_parquet('{current_ways_ids_group_path}/**')
""")

current_ways_group_relation = self.connection.sql(f"""
SELECT
w.id, n.point, w.ref_idx
Expand All @@ -1014,25 +1024,14 @@ def _group_ways(
JOIN ({required_nodes_with_structs.sql_query()}) n
ON n.id = w.ref
""")
self._save_parquet_file(
current_ways_group_relation = self._save_parquet_file(
relation=current_ways_group_relation,
file_path=grouped_ways_path / f"group={group}",
)

return groups

def _construct_ways_linestrings(
self, bar: TaskProgressBar, groups: int, destination_dir_path: Path, grouped_ways_path: Path
) -> None:
for group in bar.track(range(groups + 1)):
current_required_ways_group_path = grouped_ways_path / f"group={group}"
current_required_ways_group_relation = self.connection.sql(f"""
SELECT * FROM read_parquet('{current_required_ways_group_path}/**')
""")

ways_with_linestrings = self.connection.sql(f"""
SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring
FROM ({current_required_ways_group_relation.sql_query()})
FROM ({current_ways_group_relation.sql_query()})
GROUP BY id
""")
self._save_parquet_file(
Expand Down

0 comments on commit 00d4ae7

Please sign in to comment.