From a83523468f6183ab2ca2e72073f94e2c5aa7c413 Mon Sep 17 00:00:00 2001 From: Mustafa Tuncay Date: Wed, 21 Feb 2024 12:27:50 +0300 Subject: [PATCH 1/4] issue-617 Base Class --- pdr_backend/lake/base_data_store.py | 43 +++++++++++++++++++ pdr_backend/lake/test/test_base_data_store.py | 20 +++++++++ 2 files changed, 63 insertions(+) create mode 100644 pdr_backend/lake/base_data_store.py create mode 100644 pdr_backend/lake/test/test_base_data_store.py diff --git a/pdr_backend/lake/base_data_store.py b/pdr_backend/lake/base_data_store.py new file mode 100644 index 000000000..368841df7 --- /dev/null +++ b/pdr_backend/lake/base_data_store.py @@ -0,0 +1,43 @@ +from hashlib import md5 +from abc import abstractmethod +from typing import Optional, Literal + +import duckdb +from enforce_typing import enforce_types + + +class BaseDataStore: + @enforce_types + def __init__(self, base_directory=str): + """ + Initialize a PartitionedDataStore instance. + @arguments: + base_directory - The base directory to store the partitioned Parquet files. + """ + + self.base_directory = base_directory + self.duckdb_conn = duckdb.connect( + database=f"{self.base_directory}/duckdb.db" + ) # Keep a persistent connection + + @enforce_types + def _generate_view_name(self, base_path=str) -> str: + """ + Generate a unique view name for a given base path. + @arguments: + base_path - The base path to generate a view name for. + @returns: + str - A unique view name. + """ + + hash_object = md5(base_path.encode()) + return f"dataset_{hash_object.hexdigest()}" + + @abstractmethod + def query_data( + self, + dataset_identifier: str, + query: str, + partition_type: Optional[Literal["date", "address"]] = None, + ): + pass diff --git a/pdr_backend/lake/test/test_base_data_store.py b/pdr_backend/lake/test/test_base_data_store.py new file mode 100644 index 000000000..6dd20cfc4 --- /dev/null +++ b/pdr_backend/lake/test/test_base_data_store.py @@ -0,0 +1,20 @@ +from pdr_backend.lake.base_data_store import BaseDataStore + + +def _get_test_manager(tmpdir): + return BaseDataStore(str(tmpdir)) + + +def test__generate_view_name(tmpdir): + """ + Test the _generate_view_name method. + """ + test_manager = _get_test_manager(tmpdir) + view_name = test_manager._generate_view_name(str(tmpdir)) + + # check if the view name starts with "dataset_" + assert view_name.startswith( + "dataset_" + ), "The view name does not start with 'dataset_'" + # check if the view name continues with a hash + assert len(view_name) > 8, "The view name is too short" From f6cc990f1ea81555c1613289d4a670dd6e80dec1 Mon Sep 17 00:00:00 2001 From: Mustafa Tuncay Date: Wed, 21 Feb 2024 12:30:05 +0300 Subject: [PATCH 2/4] issue617 - Persistent Data Store --- pdr_backend/lake/persistent_data_store.py | 150 ++++++++++++++ .../lake/test/test_persistent_data_store.py | 183 ++++++++++++++++++ 2 files changed, 333 insertions(+) create mode 100644 pdr_backend/lake/persistent_data_store.py create mode 100644 pdr_backend/lake/test/test_persistent_data_store.py diff --git a/pdr_backend/lake/persistent_data_store.py b/pdr_backend/lake/persistent_data_store.py new file mode 100644 index 000000000..6dc02f45e --- /dev/null +++ b/pdr_backend/lake/persistent_data_store.py @@ -0,0 +1,150 @@ +# The PersistentDataStore class is a subclass of the Base +import os +import glob + +from enforce_typing import enforce_types +import polars as pl + +from pdr_backend.lake.base_data_store import BaseDataStore + + +class PersistentDataStore(BaseDataStore): + """ + A class to store and retrieve persistent data. + """ + + def __init__(self, base_directory: str): + """ + Initialize a PersistentDataStore instance. + @arguments: + base_directory - The base directory to store the persistent data. + """ + super().__init__(base_directory) + + @enforce_types + def _create_and_fill_table( + self, df: pl.DataFrame, dataset_identifier: str + ): # pylint: disable=unused-argument + """ + Create the dataset and insert data to the persistent dataset. + @arguments: + df - The Polars DataFrame to append. + dataset_identifier - A unique identifier for the dataset. + """ + + view_name = self._generate_view_name(self.base_directory + dataset_identifier) + + # self.duckdb_conn.register(view_name, df) + # Create the table + self.duckdb_conn.execute(f"CREATE TABLE {view_name} AS SELECT * FROM df") + + @enforce_types + def insert_to_table(self, df: pl.DataFrame, dataset_identifier: str): + """ + Insert data to an persistent dataset. + @arguments: + df - The Polars DataFrame to append. + dataset_identifier - A unique identifier for the dataset. + @example: + df = pl.DataFrame({ + "id": [1, 2, 3], + "name": ["John", "Jane", "Doe"], + "age": [25, 30, 35] + }) + insert_to_table(df, "people") + """ + + view_name = self._generate_view_name(self.base_directory + dataset_identifier) + # Check if the table exists + tables = self.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + + if view_name in [table[0] for table in tables]: + self.duckdb_conn.execute(f"INSERT INTO {view_name} SELECT * FROM df") + else: + self._create_and_fill_table(df, dataset_identifier) + + @enforce_types + def query_data( + self, dataset_identifier: str, query: str, partition_type: None = None + ) -> pl.DataFrame: + """ + Execute a SQL query across the persistent dataset using DuckDB. + @arguments: + dataset_identifier - A unique identifier for the dataset. + query - The SQL query to execute. + @returns: + pl.DataFrame - The result of the query. + @example: + query_data("people", "SELECT * FROM {view_name}") + """ + + view_name = self._generate_view_name(self.base_directory + dataset_identifier) + result_df = self.duckdb_conn.execute(query.format(view_name=view_name)).df() + + return pl.DataFrame(result_df) + + @enforce_types + def drop_table(self, dataset_identifier: str, ds_type: str = "table"): + """ + Drop the persistent dataset. + @arguments: + dataset_identifier - A unique identifier for the dataset. + ds_type - The type of the dataset to drop. Either "table" or "view". + @example: + drop_table("people") + """ + + if ds_type not in ["view", "table"]: + raise ValueError("ds_type must be either 'view' or 'table'") + + view_name = self._generate_view_name(self.base_directory + dataset_identifier) + self.duckdb_conn.execute(f"DROP {ds_type} {view_name}") + + @enforce_types + def fill_from_csv_destination(self, csv_folder_path: str, dataset_identifier: str): + """ + Fill the persistent dataset from CSV files. + @arguments: + csv_folder_path - The path to the folder containing the CSV files. + dataset_identifier - A unique identifier for the dataset. + @example: + fill_from_csv_destination("data/csv", "people") + """ + + csv_files = glob.glob(os.path.join(csv_folder_path, "*.csv")) + + print("csv_files", csv_files) + for csv_file in csv_files: + df = pl.read_csv(csv_file) + self.insert_to_table(df, dataset_identifier) + + @enforce_types + def update_data( + self, df: pl.DataFrame, dataset_identifier: str, identifier_column: str + ): + """ + Update the persistent dataset with the provided DataFrame. + @arguments: + df - The Polars DataFrame to update. + dataset_identifier - A unique identifier for the dataset. + identifier_column - The column to use as the identifier for the update. + @example: + df = pl.DataFrame({ + "id": [1, 2, 3], + "name": ["John", "Jane", "Doe"], + "age": [25, 30, 35] + }) + update_data(df, "people", "id") + """ + + view_name = self._generate_view_name(self.base_directory + dataset_identifier) + update_columns = ", ".join( + [f"{column} = {df[column]}" for column in df.columns] + ) + self.duckdb_conn.execute( + f"""UPDATE {view_name} + SET {update_columns} + WHERE {identifier_column} = {df[identifier_column]}""" + ) diff --git a/pdr_backend/lake/test/test_persistent_data_store.py b/pdr_backend/lake/test/test_persistent_data_store.py new file mode 100644 index 000000000..e97fc366b --- /dev/null +++ b/pdr_backend/lake/test/test_persistent_data_store.py @@ -0,0 +1,183 @@ +import os +import polars as pl +from pdr_backend.lake.persistent_data_store import ( + PersistentDataStore, +) # Adjust the import based on your project structure + + +# Initialize the PartitionedDataStore instance for testing +def _get_test_manager(tmpdir): + example_df = pl.DataFrame( + {"timestamp": ["2022-01-01", "2022-02-01", "2022-03-01"], "value": [10, 20, 30]} + ) + dataset_identifier = "test_df" + + return [PersistentDataStore(str(tmpdir)), example_df, dataset_identifier] + + +def _clean_up_test_manager(tmpdir, dataset_identifier): + # Clean up the test manager + dataset_path = os.path.join(str(tmpdir), dataset_identifier) + + persistent_ds_instance = PersistentDataStore(str(tmpdir)) + + view_name = persistent_ds_instance._generate_view_name(dataset_path) + + # Select tables from duckdb + views = persistent_ds_instance.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + + # Drop the view and table + if view_name in [table[0] for table in views]: + persistent_ds_instance.duckdb_conn.execute(f"DROP TABLE {view_name}") + + +def _check_view_exists(tmpdir, test_manager, dataset_identifier): + view_name = test_manager._generate_view_name(str(tmpdir) + dataset_identifier) + tables = test_manager.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + return [view_name in [table[0] for table in tables], view_name] + + +def test_create_and_fill_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager._create_and_fill_table(example_df, dataset_identifier) + + # Check if the view is registered + assert _check_view_exists(tmpdir, test_manager, dataset_identifier) + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_insert_to_exist_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager._create_and_fill_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists( + tmpdir, test_manager, dataset_identifier + ) + assert check_result + + # Insert new data to the table + example_df = pl.DataFrame( + {"timestamp": ["2022-04-01", "2022-05-01", "2022-06-01"], "value": [40, 50, 60]} + ) + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists( + tmpdir, test_manager, dataset_identifier + ) + assert check_result + + # Check if the new data is inserted + result = test_manager.duckdb_conn.execute(f"SELECT * FROM {view_name}").fetchall() + assert len(result) == 6 + print(result) + assert result[3][0] == "2022-04-01" + assert result[3][1] == 40 + assert result[4][0] == "2022-05-01" + assert result[4][1] == 50 + assert result[5][0] == "2022-06-01" + assert result[5][1] == 60 + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_insert_to_new_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists( + tmpdir, test_manager, dataset_identifier + ) + assert check_result + + # Check if the new data is inserted + result = test_manager.duckdb_conn.execute(f"SELECT * FROM {view_name}").fetchall() + assert len(result) == 3 + assert result[0][0] == "2022-01-01" + assert result[0][1] == 10 + assert result[1][0] == "2022-02-01" + assert result[1][1] == 20 + assert result[2][0] == "2022-03-01" + assert result[2][1] == 30 + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_query_data(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, _ = _check_view_exists(tmpdir, test_manager, dataset_identifier) + assert check_result + + # Execute the provided SQL query + result_df = test_manager.query_data( + dataset_identifier, "SELECT * FROM {view_name} WHERE value > 15" + ) + assert len(result_df) == 2, "Query did not return the expected number of rows." + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_drop_table(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + + test_manager.insert_to_table(example_df, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists( + tmpdir, test_manager, dataset_identifier + ) + assert check_result + + # Drop the table + test_manager.drop_table(dataset_identifier, ds_type="table") + + # Check if the view is dropped + tables = test_manager.duckdb_conn.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" + ).fetchall() + assert view_name not in [table[0] for table in tables] + _clean_up_test_manager(tmpdir, dataset_identifier) + + +def test_fill_from_csv_destination(tmpdir): + test_manager, example_df, dataset_identifier = _get_test_manager(tmpdir) + csv_folder_path = os.path.join(str(tmpdir), "csv_folder") + os.makedirs(csv_folder_path, exist_ok=True) + example_df.write_csv(os.path.join(str(csv_folder_path), "data.csv")) + + test_manager.fill_from_csv_destination(csv_folder_path, dataset_identifier) + + # Check if the view is registered + check_result, view_name = _check_view_exists( + tmpdir, test_manager, dataset_identifier + ) + assert check_result + + # Check if the new data is inserted + result = test_manager.duckdb_conn.execute(f"SELECT * FROM {view_name}").fetchall() + assert len(result) == 3 + assert result[0][0] == "2022-01-01" + assert result[0][1] == 10 + assert result[1][0] == "2022-02-01" + assert result[1][1] == 20 + assert result[2][0] == "2022-03-01" + assert result[2][1] == 30 + + _clean_up_test_manager(tmpdir, dataset_identifier) + # clean csv folder + # delete files in the folder + for file in os.listdir(csv_folder_path): + file_path = os.path.join(csv_folder_path, file) + os.remove(file_path) + + # delete the folder + os.rmdir(csv_folder_path) From ac81100506a7347da602127aece848b121cb41b3 Mon Sep 17 00:00:00 2001 From: Mustafa Tuncay Date: Wed, 21 Feb 2024 12:34:32 +0300 Subject: [PATCH 3/4] duckdb dependency is added to setup.py --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index 877bdb137..2237921b2 100644 --- a/setup.py +++ b/setup.py @@ -12,6 +12,7 @@ "bumpversion", "ccxt>=4.1.59", "coverage", + "duckdb", "enforce_typing", "eth-account", "eth-keys", From 6ffc626aa34cefde86a4e4ac136817d516f29073 Mon Sep 17 00:00:00 2001 From: Mustafa Tuncay Date: Tue, 27 Feb 2024 14:15:02 +0300 Subject: [PATCH 4/4] dry fix --- pdr_backend/lake/base_data_store.py | 3 ++- pdr_backend/lake/persistent_data_store.py | 10 +++++----- pdr_backend/lake/test/test_persistent_data_store.py | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pdr_backend/lake/base_data_store.py b/pdr_backend/lake/base_data_store.py index 368841df7..70c88e518 100644 --- a/pdr_backend/lake/base_data_store.py +++ b/pdr_backend/lake/base_data_store.py @@ -30,7 +30,8 @@ def _generate_view_name(self, base_path=str) -> str: str - A unique view name. """ - hash_object = md5(base_path.encode()) + path = f"{self.base_directory}/{base_path}" + hash_object = md5(path.encode()) return f"dataset_{hash_object.hexdigest()}" @abstractmethod diff --git a/pdr_backend/lake/persistent_data_store.py b/pdr_backend/lake/persistent_data_store.py index 6dc02f45e..b87c807f6 100644 --- a/pdr_backend/lake/persistent_data_store.py +++ b/pdr_backend/lake/persistent_data_store.py @@ -32,7 +32,7 @@ def _create_and_fill_table( dataset_identifier - A unique identifier for the dataset. """ - view_name = self._generate_view_name(self.base_directory + dataset_identifier) + view_name = self._generate_view_name(dataset_identifier) # self.duckdb_conn.register(view_name, df) # Create the table @@ -54,7 +54,7 @@ def insert_to_table(self, df: pl.DataFrame, dataset_identifier: str): insert_to_table(df, "people") """ - view_name = self._generate_view_name(self.base_directory + dataset_identifier) + view_name = self._generate_view_name(dataset_identifier) # Check if the table exists tables = self.duckdb_conn.execute( "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" @@ -80,7 +80,7 @@ def query_data( query_data("people", "SELECT * FROM {view_name}") """ - view_name = self._generate_view_name(self.base_directory + dataset_identifier) + view_name = self._generate_view_name(dataset_identifier) result_df = self.duckdb_conn.execute(query.format(view_name=view_name)).df() return pl.DataFrame(result_df) @@ -99,7 +99,7 @@ def drop_table(self, dataset_identifier: str, ds_type: str = "table"): if ds_type not in ["view", "table"]: raise ValueError("ds_type must be either 'view' or 'table'") - view_name = self._generate_view_name(self.base_directory + dataset_identifier) + view_name = self._generate_view_name(dataset_identifier) self.duckdb_conn.execute(f"DROP {ds_type} {view_name}") @enforce_types @@ -139,7 +139,7 @@ def update_data( update_data(df, "people", "id") """ - view_name = self._generate_view_name(self.base_directory + dataset_identifier) + view_name = self._generate_view_name(dataset_identifier) update_columns = ", ".join( [f"{column} = {df[column]}" for column in df.columns] ) diff --git a/pdr_backend/lake/test/test_persistent_data_store.py b/pdr_backend/lake/test/test_persistent_data_store.py index e97fc366b..33549b986 100644 --- a/pdr_backend/lake/test/test_persistent_data_store.py +++ b/pdr_backend/lake/test/test_persistent_data_store.py @@ -34,7 +34,7 @@ def _clean_up_test_manager(tmpdir, dataset_identifier): def _check_view_exists(tmpdir, test_manager, dataset_identifier): - view_name = test_manager._generate_view_name(str(tmpdir) + dataset_identifier) + view_name = test_manager._generate_view_name(dataset_identifier) tables = test_manager.duckdb_conn.execute( "SELECT table_name FROM information_schema.tables WHERE table_schema = 'main'" ).fetchall()