Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add advanced wide form functions #26

Merged
merged 53 commits into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
e10646b
refactor: change available release versions caching
RaczeQ Nov 5, 2024
ed43112
feat: added timing decorator for aggregating total time with nested c…
RaczeQ Nov 5, 2024
32b5591
feat: added columns_to_download functionality
RaczeQ Nov 7, 2024
a4c7caf
chore: added dummy advanced fuctions file structure
RaczeQ Nov 7, 2024
6ec6e9e
chore: add pragma no cover
RaczeQ Nov 8, 2024
40c74f7
fix: change test working directory
RaczeQ Nov 8, 2024
aaefd02
chore: prepare them_type classification for wide form
RaczeQ Nov 8, 2024
f8a711f
chore: add wide form functions base code
RaczeQ Nov 13, 2024
73d0ee7
fix(pre-commit.ci): auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Nov 13, 2024
1b7241a
feat: add working wide form logic for default cases
RaczeQ Nov 24, 2024
550b725
chore: make multiprocessing manager a singleton
RaczeQ Nov 24, 2024
42e3615
chore: cleaned nested Path call
RaczeQ Nov 24, 2024
8784c65
feat: added working buildings with parts download logic
RaczeQ Nov 24, 2024
fab6343
feat: refactor internat data download to work with multiple theme typ…
RaczeQ Dec 4, 2024
d4e38ea
feat: add dedicated function for downloading buildings with parts
RaczeQ Dec 4, 2024
f837525
Merge branch 'main' into 10-add-some-dedicated-functions-for-highways…
RaczeQ Dec 4, 2024
9098f12
fix: remove prints from functions
RaczeQ Dec 4, 2024
9f06060
fix: change windows test path
RaczeQ Dec 4, 2024
3863a81
feat: finish building and poi wide format logic
RaczeQ Dec 8, 2024
baa67c9
feat: add theme and type metadata info
RaczeQ Dec 8, 2024
f63f589
chore: remove unnecessary download buildings function
RaczeQ Dec 10, 2024
75be4f7
feat: add option to download multiple theme type datsets at once
RaczeQ Dec 14, 2024
e229336
chore: remove todo comment
RaczeQ Jan 1, 2025
ee64d30
Merge branch 'main' into 10-add-some-dedicated-functions-for-highways…
RaczeQ Jan 5, 2025
0c8f436
chore: tidy up nested context managers
RaczeQ Jan 5, 2025
90d5345
chore: refactor wide form data download logic
RaczeQ Jan 7, 2025
eef076f
feat: add option to download multiple datasets at once
RaczeQ Jan 7, 2025
a4a1c45
feat: finish wide form combination of multiple datasets
RaczeQ Jan 8, 2025
c30e79a
chore: changed destination directory and added max_workers parameter
RaczeQ Jan 8, 2025
a849471
chore: reorder functions
RaczeQ Jan 8, 2025
bb8f22e
feat: added option to pass list of pyarrow filters and columns to dow…
RaczeQ Jan 8, 2025
19205d8
chore: added changelog entry
RaczeQ Jan 8, 2025
fa819d2
chore: refactor wide form code
RaczeQ Jan 8, 2025
fe8417b
fix: split hierarchy and download columns
RaczeQ Jan 8, 2025
6daa5d0
fix: remove additional raise clause
RaczeQ Jan 9, 2025
6e1566d
feat: add missing progress bars and hierarchy to result file name
RaczeQ Jan 9, 2025
ef69a48
chore: clear wide_form file
RaczeQ Jan 10, 2025
9017843
feat: finalize wide_form api
RaczeQ Jan 12, 2025
b095dc1
feat: change nested fields pyarrow filter separator in cli
RaczeQ Jan 12, 2025
b92831d
feat: add minimal confidence for poi filtering
RaczeQ Jan 13, 2025
b6142be
chore: change pyarrow_filters typing
RaczeQ Jan 13, 2025
5f88363
chore: change pyarrow filters and columns length checking
RaczeQ Jan 13, 2025
9476333
chre: add wide form tests
RaczeQ Jan 14, 2025
01e4fae
chore: add todo comment
RaczeQ Jan 14, 2025
cf5d7b5
fix: reorder columns in wide form
RaczeQ Jan 14, 2025
69b6101
chore: finish wide form tests
RaczeQ Jan 14, 2025
b1201e7
feat: add automatic pyarrow filter fields names detection
RaczeQ Jan 14, 2025
b88c688
chore: reorder columns select in sql
RaczeQ Jan 14, 2025
79ffc8d
feat: finish wide form tests
RaczeQ Jan 14, 2025
70e67e7
chore: increase tests coverage
RaczeQ Jan 14, 2025
7ddabc0
chore: simplify _download_data function
RaczeQ Jan 14, 2025
1983906
chore: add todo comment for tests
RaczeQ Jan 14, 2025
5dd79eb
chore: add todo comment for tests
RaczeQ Jan 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

## [Unreleased]

### Added

- Automatic total time wrapper decorator to aggregate nested function calls
- Parameter `columns_to_download` for selecting columns to download from the dataset [#23](https://github.com/kraina-ai/overturemaestro/issues/23)
- Option to pass a list of pyarrow filters and columns for download for each theme type pair when downloading multiple datasets at once

### Changed

- Refactored available release versions caching [#24](https://github.com/kraina-ai/overturemaestro/issues/24)
- Removed hive partitioned parquet schema columns from GeoDataFrame loading

### Deprecated

- Nested fields in PyArrow filter in CLI is now expected to be separated by a dot, not a comma [#22](https://github.com/kraina-ai/overturemaestro/issues/22)

## [0.1.2] - 2024-12-17

### Added
Expand Down
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ Required:

- `geoarrow-rust-core (>=0.3.0)`: For transforming Arrow data to Shapely objects

- `duckdb (>=1.1.0)`: For transforming downloaded data to the wide format

- `pooch (>=1.6.0)`: For downloading precalculated dataset indexes

- `rich (>=12.0.0)`: For showing progress bars
Expand Down
1 change: 0 additions & 1 deletion overturemaestro/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ def main() -> None:
try:
from overturemaestro import __app_name__, cli
except ImportError as exc:
raise
error_msg = (
"Missing optional dependencies required for the CLI."
" Please install required packages using `pip install overturemaestro[cli]`."
Expand Down
27 changes: 27 additions & 0 deletions overturemaestro/_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
"""Helper functions for DuckDB."""

from pathlib import Path
from typing import Union

import duckdb


def _sql_escape(value: str) -> str:
"""Escape value for SQL query."""
return value.replace("'", "''")


def _set_up_duckdb_connection(tmp_dir_path: Union[str, Path]) -> "duckdb.DuckDBPyConnection":
"""Create DuckDB connection in a given directory."""
local_db_file = "db.duckdb"
connection = duckdb.connect(
database=str(Path(tmp_dir_path) / local_db_file),
config=dict(preserve_insertion_order=False),
)
connection.sql("SET enable_progress_bar = false;")
connection.sql("SET enable_progress_bar_print = false;")

connection.install_extension("spatial")
connection.load_extension("spatial")

return connection
6 changes: 6 additions & 0 deletions overturemaestro/_exceptions.py
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
class QueryNotGeocodedError(ValueError): ...


class MissingColumnError(ValueError): ...


class HierarchyDepthOutOfBoundsError(ValueError): ...
58 changes: 38 additions & 20 deletions overturemaestro/_parquet_multiprocessing.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import multiprocessing
from multiprocessing.managers import SyncManager
from pathlib import Path
from queue import Empty, Queue
from time import sleep, time
from typing import TYPE_CHECKING, Any, Callable, Optional, Union
from typing import TYPE_CHECKING, Any, Callable, Optional, Union, cast

from overturemaestro._rich_progress import VERBOSITY_MODE, TrackProgressSpinner

if TYPE_CHECKING:
if TYPE_CHECKING: # pragma: no cover
from multiprocessing.managers import ValueProxy
from threading import Lock

Expand All @@ -31,13 +32,14 @@ def _job(
columns: Optional[list[str]],
filesystem: "fs.FileSystem",
) -> None: # pragma: no cover
import hashlib

import pyarrow.dataset as ds
import pyarrow.parquet as pq

current_pid = multiprocessing.current_process().pid

filepath = save_path / f"{current_pid}.parquet"
writer = None
writers = {}
while not queue.empty():
try:
file_name, row_group_index = None, None
Expand All @@ -61,10 +63,16 @@ def _job(
tracker.value += 1
continue

if not writer:
writer = pq.ParquetWriter(filepath, result_table.schema)
h = hashlib.new("sha256")
h.update(result_table.schema.to_string().encode())
schema_hash = h.hexdigest()

if schema_hash not in writers:
filepath = save_path / str(current_pid) / f"{schema_hash}.parquet"
filepath.parent.mkdir(exist_ok=True, parents=True)
writers[schema_hash] = pq.ParquetWriter(filepath, result_table.schema)

writer.write_table(result_table)
writers[schema_hash].write_table(result_table)

with tracker_lock:
tracker.value += 1
Expand All @@ -80,7 +88,7 @@ def _job(
)
raise MultiprocessingRuntimeError(msg) from ex

if writer:
for writer in writers.values():
writer.close()


Expand All @@ -107,14 +115,21 @@ def exception(self) -> Optional[tuple[Exception, str]]:
return self._exception


class SingletonContextManager(SyncManager):
def __new__(cls, ctx: multiprocessing.context.SpawnContext) -> "SingletonContextManager":
if not hasattr(cls, "instance"):
cls.instance = ctx.Manager()
return cast(SingletonContextManager, cls.instance)


def _read_row_group_number(path: str, filesystem: "fs.FileSystem") -> int:
import pyarrow.parquet as pq

return int(pq.ParquetFile(path, filesystem=filesystem).num_row_groups)


def map_parquet_dataset(
dataset_path: Union[str, list[str]],
dataset_path: Union[str, Path, list[str], list[Path]],
destination_path: Path,
function: Callable[[str, int, "pa.Table"], "pa.Table"],
progress_description: str,
Expand Down Expand Up @@ -157,7 +172,7 @@ def map_parquet_dataset(

from overturemaestro._rich_progress import TrackProgressBar

manager = ctx.Manager()
manager = SingletonContextManager(ctx=ctx)

queue: Queue[tuple[str, int]] = manager.Queue()
tracker: ValueProxy[int] = manager.Value("i", 0)
Expand All @@ -178,17 +193,20 @@ def map_parquet_dataset(
no_scan_workers = min(max_workers, no_scan_workers)
no_processing_workers = min(max_workers, no_processing_workers)

with TrackProgressBar(verbosity_mode=verbosity_mode) as progress:
total_files = len(dataset.files)
with ProcessPoolExecutor(max_workers=min(no_scan_workers, total_files)) as ex:
fn = partial(_read_row_group_number, filesystem=dataset.filesystem)
row_group_numbers = list(
progress.track(
ex.map(fn, dataset.files, chunksize=1),
description="Reading all parquet files row groups",
total=total_files,
)
total_files = len(dataset.files)

with (
TrackProgressBar(verbosity_mode=verbosity_mode) as progress,
ProcessPoolExecutor(max_workers=min(no_scan_workers, total_files)) as ex,
):
fn = partial(_read_row_group_number, filesystem=dataset.filesystem)
row_group_numbers = list(
progress.track(
ex.map(fn, dataset.files, chunksize=1),
description="Reading all parquet files row groups",
total=total_files,
)
)

for pq_file, row_group_number in zip(dataset.files, row_group_numbers):
for row_group in range(row_group_number):
Expand Down
66 changes: 66 additions & 0 deletions overturemaestro/advanced_functions/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
"""
Advanced functions.

This module contains dedicated functions for specific use cases.
"""

# from overturemaestro.advanced_functions.poi import (
# convert_bounding_box_to_pois_geodataframe,
# convert_bounding_box_to_pois_parquet,
# convert_geometry_to_pois_geodataframe,
# convert_geometry_to_pois_parquet,
# )
# from overturemaestro.advanced_functions.transportation import (
# convert_bounding_box_to_roads_geodataframe,
# convert_bounding_box_to_roads_parquet,
# convert_geometry_to_roads_geodataframe,
# convert_geometry_to_roads_parquet,
# )
from overturemaestro.advanced_functions.functions import (
convert_bounding_box_to_wide_form_geodataframe,
convert_bounding_box_to_wide_form_geodataframe_for_all_types,
convert_bounding_box_to_wide_form_geodataframe_for_multiple_types,
convert_bounding_box_to_wide_form_parquet,
convert_bounding_box_to_wide_form_parquet_for_all_types,
convert_bounding_box_to_wide_form_parquet_for_multiple_types,
convert_geometry_to_wide_form_geodataframe,
convert_geometry_to_wide_form_geodataframe_for_all_types,
convert_geometry_to_wide_form_geodataframe_for_multiple_types,
convert_geometry_to_wide_form_parquet,
convert_geometry_to_wide_form_parquet_for_all_types,
convert_geometry_to_wide_form_parquet_for_multiple_types,
)

__all__ = [
"convert_bounding_box_to_wide_form_geodataframe",
"convert_bounding_box_to_wide_form_geodataframe_for_all_types",
"convert_bounding_box_to_wide_form_geodataframe_for_multiple_types",
"convert_bounding_box_to_wide_form_parquet",
"convert_bounding_box_to_wide_form_parquet_for_all_types",
"convert_bounding_box_to_wide_form_parquet_for_multiple_types",
"convert_geometry_to_wide_form_geodataframe",
"convert_geometry_to_wide_form_geodataframe_for_all_types",
"convert_geometry_to_wide_form_geodataframe_for_multiple_types",
"convert_geometry_to_wide_form_parquet",
"convert_geometry_to_wide_form_parquet_for_all_types",
"convert_geometry_to_wide_form_parquet_for_multiple_types",
]

# __all__ = [
# "convert_bounding_box_to_buildings_geodataframe",
# "convert_bounding_box_to_buildings_parquet",
# "convert_bounding_box_to_pois_geodataframe",
# "convert_bounding_box_to_pois_parquet",
# "convert_bounding_box_to_roads_geodataframe",
# "convert_bounding_box_to_roads_parquet",
# "convert_bounding_box_to_wide_form_geodataframe",
# "convert_bounding_box_to_wide_form_parquet",
# "convert_geometry_to_buildings_geodataframe",
# "convert_geometry_to_buildings_parquet",
# "convert_geometry_to_pois_geodataframe",
# "convert_geometry_to_pois_parquet",
# "convert_geometry_to_roads_geodataframe",
# "convert_geometry_to_roads_parquet",
# "convert_geometry_to_wide_form_geodataframe",
# "convert_geometry_to_wide_form_parquet",
# ]
Loading
Loading