Skip to content

Commit

Permalink
feat: add separate process query execution
Browse files Browse the repository at this point in the history
  • Loading branch information
RaczeQ committed Mar 6, 2024
1 parent 5c38e4f commit 1542934
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 40 deletions.
89 changes: 53 additions & 36 deletions quackosm/pbf_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,15 @@
import warnings
from collections.abc import Iterable
from math import floor
from multiprocessing import Pool
from pathlib import Path
from time import sleep
from typing import Any, Literal, NamedTuple, Optional, Union, cast

import duckdb
import geoarrow.pyarrow as ga
import geopandas as gpd
import psutil
import pyarrow as pa
import pyarrow.parquet as pq
import shapely.wkt as wktlib
Expand Down Expand Up @@ -133,14 +135,13 @@ def __init__(
self.connection: duckdb.DuckDBPyConnection = None

self.rows_per_bucket = PbfFileReader.ROWS_PER_BUCKET_MEMORY_CONFIG[24]
# self.rows_per_bucket = PbfFileReader.ROWS_PER_BUCKET_MEMORY_CONFIG[0]
# acutal_memory = psutil.virtual_memory()
# # If less than 8 / 16 / 24 GB total memory, reduce number of rows per group
# for memory_gb, rows_per_bucket in PbfFileReader.ROWS_PER_BUCKET_MEMORY_CONFIG.items():
# if acutal_memory.total >= (memory_gb * (1024**3)):
# self.rows_per_bucket = rows_per_bucket
# else:
# break
acutal_memory = psutil.virtual_memory()
# If less than 8 / 16 / 24 GB total memory, reduce number of rows per group
for memory_gb, rows_per_bucket in PbfFileReader.ROWS_PER_BUCKET_MEMORY_CONFIG.items():
if acutal_memory.total >= (memory_gb * (1024**3)):
self.rows_per_bucket = rows_per_bucket
else:
break

self.parquet_compression = parquet_compression

Expand Down Expand Up @@ -201,7 +202,7 @@ def convert_pbf_to_gpq(
with tempfile.TemporaryDirectory(dir=self.working_directory.resolve()) as self.tmp_dir_name:
self.tmp_dir_path = Path(self.tmp_dir_name)
try:
self._set_up_duckdb_connection()
self.connection = _set_up_duckdb_connection(tmp_dir_path=self.tmp_dir_path)
result_file_path = result_file_path or self._generate_geoparquet_result_file_path(
pbf_path,
filter_osm_ids=filter_osm_ids,
Expand Down Expand Up @@ -442,28 +443,6 @@ def _drop_duplicates_features_in_pyarrow_table(
)
return joined_parquet_table

def _set_up_duckdb_connection(self) -> None:
self.connection = duckdb.connect(
database=str(self.tmp_dir_path / "db.duckdb"),
config=dict(preserve_insertion_order=False),
)
for extension_name in ("parquet", "spatial"):
self.connection.install_extension(extension_name)
self.connection.load_extension(extension_name)

self.connection.sql(
"""
CREATE OR REPLACE MACRO linestring_to_linestring_wkt(ls) AS
'LINESTRING (' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || ')';
"""
)
self.connection.sql(
"""
CREATE OR REPLACE MACRO linestring_to_polygon_wkt(ls) AS
'POLYGON ((' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || '))';
"""
)

def _parse_pbf_file(
self,
pbf_path: Union[str, Path],
Expand Down Expand Up @@ -1088,10 +1067,12 @@ def _sql_to_parquet_file(self, sql_query: str, file_path: Path) -> "duckdb.DuckD
return self._save_parquet_file(relation, file_path)

def _save_parquet_file(
self, relation: "duckdb.DuckDBPyRelation", file_path: Path
self,
relation: "duckdb.DuckDBPyRelation",
file_path: Path,
run_in_separate_process: bool = False,
) -> "duckdb.DuckDBPyRelation":
self.connection.sql(
f"""
query = f"""
COPY (
SELECT * FROM ({relation.sql_query()})
) TO '{file_path}' (
Expand All @@ -1100,8 +1081,13 @@ def _save_parquet_file(
ROW_GROUP_SIZE 25000,
COMPRESSION '{self.parquet_compression}'
)
"""
)
"""
if run_in_separate_process:
with Pool() as pool:
r = pool.apply_async(_run_query, args=(query, self.tmp_dir_path))
r.get()
else:
self.connection.sql(query)
return self.connection.sql(
f"""
SELECT * FROM read_parquet('{file_path}/**')
Expand Down Expand Up @@ -1378,6 +1364,7 @@ def _construct_ways_linestrings(
self._save_parquet_file(
relation=ways_with_linestrings,
file_path=destination_dir_path / f"group={group}",
run_in_separate_process=True,
)
self._delete_directories(current_ways_group_path)

Expand Down Expand Up @@ -2106,3 +2093,33 @@ def _parse_features_relation_to_groups(
)

return grouped_features_relation


def _set_up_duckdb_connection(tmp_dir_path: Path) -> "duckdb.DuckDBPyConnection":
connection = duckdb.connect(
database=str(tmp_dir_path / "db.duckdb"),
config=dict(preserve_insertion_order=False),
)
for extension_name in ("parquet", "spatial"):
connection.install_extension(extension_name)
connection.load_extension(extension_name)

connection.sql(
"""
CREATE OR REPLACE MACRO linestring_to_linestring_wkt(ls) AS
'LINESTRING (' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || ')';
"""
)
connection.sql(
"""
CREATE OR REPLACE MACRO linestring_to_polygon_wkt(ls) AS
'POLYGON ((' || array_to_string([pt.x || ' ' || pt.y for pt in ls], ', ') || '))';
"""
)

return connection


def _run_query(query: str, tmp_dir_path: Path) -> None:
conn = _set_up_duckdb_connection(tmp_dir_path=tmp_dir_path)
conn.sql(query)
1 change: 0 additions & 1 deletion tests/base/test_pbf_file_reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,6 @@ def check_if_relation_in_osm_is_valid_based_on_geometry(pbf_file: str, relation_
COALESCE(r.ref_role, 'outer') as ref_role,
r.ref,
FROM unnested_relation_way_refs r
ORDER BY r.id, r.ref_idx
),
unnested_way_refs AS (
SELECT
Expand Down
7 changes: 4 additions & 3 deletions tests/benchmark/test_big_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ def test_big_file(extract_name: str) -> None:
f"https://download.geofabrik.de/europe/{extract_name}-latest.osm.pbf", str(file_name)
)

PbfFileReader(working_directory=files_dir).convert_pbf_to_gpq(
pbf_path=file_name, ignore_cache=True
)
reader = PbfFileReader(working_directory=files_dir)
# Reset rows_per_bucket value to test automatic downscaling
reader.rows_per_bucket = PbfFileReader.ROWS_PER_BUCKET_MEMORY_CONFIG[24]
reader.convert_pbf_to_gpq(pbf_path=file_name, ignore_cache=True)

0 comments on commit 1542934

Please sign in to comment.