diff --git a/quackosm/pbf_file_reader.py b/quackosm/pbf_file_reader.py index 9ff75ca..4030415 100644 --- a/quackosm/pbf_file_reader.py +++ b/quackosm/pbf_file_reader.py @@ -475,15 +475,6 @@ def _parse_pbf_file( ) self._delete_directories("nodes_filtered_ids") - # ways_refs_with_nodes_structs = self._get_ways_refs_with_nodes_structs( - # converted_osm_parquet_files - # ) - # self._delete_directories( - # [ - # "nodes_valid_with_tags", - # ], - # ) - filtered_ways_with_linestrings = self._get_filtered_ways_with_linestrings( osm_parquet_files=converted_osm_parquet_files ) @@ -496,7 +487,6 @@ def _parse_pbf_file( "ways_required_grouped", "ways_required_ids", "ways_with_unnested_nodes_refs", - # "ways_refs_with_nodes_structs", "required_ways_ids_grouped", "required_ways_grouped", "required_ways_tmp", @@ -775,19 +765,17 @@ def _prefilter_elements_ids( ways_valid_ids = self._sql_to_parquet_file( sql_query=f""" WITH total_ways_with_nodes_refs AS ( - SELECT id, ref + SELECT id FROM ({ways_with_unnested_nodes_refs.sql_query()}) ), unmatched_ways_with_nodes_refs AS ( - SELECT id, ref + SELECT id FROM ({ways_with_unnested_nodes_refs.sql_query()}) w ANTI JOIN ({nodes_valid_with_tags.sql_query()}) nv ON nv.id = w.ref ) SELECT DISTINCT id FROM total_ways_with_nodes_refs - EXCEPT - SELECT DISTINCT id - FROM unmatched_ways_with_nodes_refs + ANTI JOIN unmatched_ways_with_nodes_refs USING (id) """, file_path=self.tmp_dir_path / "ways_valid_ids", ) @@ -879,19 +867,17 @@ def _prefilter_elements_ids( relations_valid_ids = self._sql_to_parquet_file( sql_query=f""" WITH total_relation_refs AS ( - SELECT id, ref + SELECT id FROM ({relations_with_unnested_way_refs.sql_query()}) frr ), unmatched_relation_refs AS ( - SELECT id, ref + SELECT id FROM ({relations_with_unnested_way_refs.sql_query()}) r ANTI JOIN ({ways_valid_ids.sql_query()}) wv ON wv.id = r.ref ) SELECT DISTINCT id FROM total_relation_refs - EXCEPT - SELECT DISTINCT id - FROM unmatched_relation_refs + ANTI JOIN unmatched_relation_refs USING (id) """, file_path=self.tmp_dir_path / "relations_valid_ids", ) @@ -1076,7 +1062,7 @@ def _save_parquet_file( ) -> "duckdb.DuckDBPyRelation": query = f""" COPY ( - SELECT * FROM ({relation.sql_query()}) + {relation.sql_query()} ) TO '{file_path}' ( FORMAT 'parquet', PER_THREAD_OUTPUT true, @@ -1084,9 +1070,13 @@ def _save_parquet_file( COMPRESSION '{self.parquet_compression}' ) """ + self._run_query(query, run_in_separate_process) + return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')") + + def _run_query(self, sql_query: str, run_in_separate_process: bool = False) -> None: if run_in_separate_process: with Pool() as pool: - r = pool.apply_async(_run_query, args=(query, self.tmp_dir_path)) + r = pool.apply_async(_run_query, args=(sql_query, self.tmp_dir_path)) actual_memory = psutil.virtual_memory() percentage_threshold = 95 if (actual_memory.total * 0.05) > MEMORY_1GB: @@ -1102,8 +1092,7 @@ def _save_parquet_file( sleep(0.5) r.get() else: - self.connection.sql(query) - return self.connection.sql(f"SELECT * FROM read_parquet('{file_path}/**')") + self.connection.sql(sql_query) def _calculate_unique_ids_to_parquet( self, file_path: Path, result_path: Optional[Path] = None @@ -1447,6 +1436,8 @@ def _group_ways( """ ) + self._delete_directories(current_ways_group_path) + return groups def _construct_ways_linestrings( @@ -1458,24 +1449,32 @@ def _construct_ways_linestrings( ) -> None: for group in bar.track(range(groups + 1)): current_ways_group_path = grouped_ways_path / f"group={group}" - ways_with_linestrings = self.connection.sql( - f""" - SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring - FROM read_parquet( - '{current_ways_group_path}/**', - hive_partitioning = true, - hive_types = {{ 'ref_idx': SMALLINT }} + current_destination_path = destination_dir_path / f"group={group}" + + query = f""" + COPY ( + SELECT id, list(point ORDER BY ref_idx ASC)::LINESTRING_2D linestring + FROM read_parquet( + '{current_ways_group_path}/**', + hive_partitioning = TRUE, + hive_types = {{ 'ref_idx': SMALLINT }} + ) + GROUP BY id + ) TO '{current_destination_path}' ( + FORMAT 'parquet', + PER_THREAD_OUTPUT true, + ROW_GROUP_SIZE 25000, + COMPRESSION '{self.parquet_compression}' ) - GROUP BY id - """ - ) - self._save_parquet_file( - relation=ways_with_linestrings, - file_path=destination_dir_path / f"group={group}", + """ + + self._run_query( + query, run_in_separate_process=( self.rows_per_bucket > PbfFileReader.ROWS_PER_BUCKET_MEMORY_CONFIG[0] ), ) + self._delete_directories(current_ways_group_path) def _get_filtered_ways_with_proper_geometry(