Skip to content

Commit

Permalink
chore: clean reader code
Browse files Browse the repository at this point in the history
  • Loading branch information
RaczeQ committed Mar 11, 2024
1 parent 00c73d8 commit 9d09941
Showing 1 changed file with 36 additions and 37 deletions.
73 changes: 36 additions & 37 deletions quackosm/pbf_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -475,15 +475,6 @@ def _parse_pbf_file(
)
self._delete_directories("nodes_filtered_ids")

# ways_refs_with_nodes_structs = self._get_ways_refs_with_nodes_structs(
# converted_osm_parquet_files
# )
# self._delete_directories(
# [
# "nodes_valid_with_tags",
# ],
# )

filtered_ways_with_linestrings = self._get_filtered_ways_with_linestrings(
osm_parquet_files=converted_osm_parquet_files
)
Expand All @@ -496,7 +487,6 @@ def _parse_pbf_file(
"ways_required_grouped",
"ways_required_ids",
"ways_with_unnested_nodes_refs",
# "ways_refs_with_nodes_structs",
"required_ways_ids_grouped",
"required_ways_grouped",
"required_ways_tmp",
Expand Down Expand Up @@ -775,19 +765,17 @@ def _prefilter_elements_ids(
ways_valid_ids = self._sql_to_parquet_file(
sql_query=f"""
WITH total_ways_with_nodes_refs AS (
SELECT id, ref
SELECT id
FROM ({ways_with_unnested_nodes_refs.sql_query()})
),
unmatched_ways_with_nodes_refs AS (
SELECT id, ref
SELECT id
FROM ({ways_with_unnested_nodes_refs.sql_query()}) w
ANTI JOIN ({nodes_valid_with_tags.sql_query()}) nv ON nv.id = w.ref
)
SELECT DISTINCT id
FROM total_ways_with_nodes_refs
EXCEPT
SELECT DISTINCT id
FROM unmatched_ways_with_nodes_refs
ANTI JOIN unmatched_ways_with_nodes_refs USING (id)
""",
file_path=self.tmp_dir_path / "ways_valid_ids",
)
Expand Down Expand Up @@ -879,19 +867,17 @@ def _prefilter_elements_ids(
relations_valid_ids = self._sql_to_parquet_file(
sql_query=f"""
WITH total_relation_refs AS (
SELECT id, ref
SELECT id
FROM ({relations_with_unnested_way_refs.sql_query()}) frr
),
unmatched_relation_refs AS (
SELECT id, ref
SELECT id
FROM ({relations_with_unnested_way_refs.sql_query()}) r
ANTI JOIN ({ways_valid_ids.sql_query()}) wv ON wv.id = r.ref
)
SELECT DISTINCT id
FROM total_relation_refs
EXCEPT
SELECT DISTINCT id
FROM unmatched_relation_refs
ANTI JOIN unmatched_relation_refs USING (id)
""",
file_path=self.tmp_dir_path / "relations_valid_ids",
)
Expand Down Expand Up @@ -1076,17 +1062,21 @@ def _save_parquet_file(
) -> "duckdb.DuckDBPyRelation":
query = f"""
COPY (
SELECT * FROM ({relation.sql_query()})
{relation.sql_query()}
) TO '{file_path}' (
FORMAT 'parquet',
PER_THREAD_OUTPUT true,
ROW_GROUP_SIZE 25000,
COMPRESSION '{self.parquet_compression}'
)
"""
self._run_query(query, run_in_separate_process)
return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')")

def _run_query(self, sql_query: str, run_in_separate_process: bool = False) -> None:
if run_in_separate_process:
with Pool() as pool:
r = pool.apply_async(_run_query, args=(query, self.tmp_dir_path))
r = pool.apply_async(_run_query, args=(sql_query, self.tmp_dir_path))
actual_memory = psutil.virtual_memory()
percentage_threshold = 95
if (actual_memory.total * 0.05) > MEMORY_1GB:
Expand All @@ -1102,8 +1092,7 @@ def _save_parquet_file(
sleep(0.5)
r.get()
else:
self.connection.sql(query)
return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')")
self.connection.sql(sql_query)

def _calculate_unique_ids_to_parquet(
self, file_path: Path, result_path: Optional[Path] = None
Expand Down Expand Up @@ -1447,6 +1436,8 @@ def _group_ways(
"""
)

self._delete_directories(current_ways_group_path)

return groups

def _construct_ways_linestrings(
Expand All @@ -1458,24 +1449,32 @@ def _construct_ways_linestrings(
) -> None:
for group in bar.track(range(groups + 1)):
current_ways_group_path = grouped_ways_path / f"group={group}"
ways_with_linestrings = self.connection.sql(
f"""
SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring
FROM read_parquet(
'{current_ways_group_path}/**',
hive_partitioning = true,
hive_types = {{ 'ref_idx': SMALLINT }}
current_destination_path = destination_dir_path / f"group={group}"

query = f"""
COPY (
SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring
FROM read_parquet(
'{current_ways_group_path}/**',
hive_partitioning = TRUE,
hive_types = {{ 'ref_idx': SMALLINT }}
)
GROUP BY id
) TO '{current_destination_path}' (
FORMAT 'parquet',
PER_THREAD_OUTPUT true,
ROW_GROUP_SIZE 25000,
COMPRESSION '{self.parquet_compression}'
)
GROUP BY id
"""
)
self._save_parquet_file(
relation=ways_with_linestrings,
file_path=destination_dir_path / f"group={group}",
"""

self._run_query(
query,
run_in_separate_process=(
self.rows_per_bucket > PbfFileReader.ROWS_PER_BUCKET_MEMORY_CONFIG[0]
),
)

self._delete_directories(current_ways_group_path)

def _get_filtered_ways_with_proper_geometry(
Expand Down

0 comments on commit 9d09941

Please sign in to comment.