Skip to content

Commit

Permalink
Reorder functions and use parametrised testing to check both implemen…
Browse files Browse the repository at this point in the history
…tations
  • Loading branch information
AlexandraImbrisca committed Jan 14, 2025
1 parent efd9560 commit 1117af7
Show file tree
Hide file tree
Showing 2 changed files with 170 additions and 113 deletions.
190 changes: 101 additions & 89 deletions open_mastr/xml_download/utils_write_to_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import numpy as np
import pandas as pd
import sqlalchemy
from sqlalchemy import inspect
from sqlalchemy import inspect, select
from sqlalchemy.sql import text
from sqlalchemy.sql.sqltypes import Date, DateTime

Expand Down Expand Up @@ -112,6 +112,20 @@ def is_first_file(file_name: str) -> bool:
)


def cast_date_columns_to_datetime(
xml_table_name: str, df: pd.DataFrame
) -> pd.DataFrame:
sqlalchemy_columnlist = tablename_mapping[xml_table_name][
"__class__"
].__table__.columns.items()
for column in sqlalchemy_columnlist:
column_name = column[0]
if is_date_column(column, df):
# Convert column to datetime64, invalid string -> NaT
df[column_name] = pd.to_datetime(df[column_name], errors="coerce")
return df


def cast_date_columns_to_string(xml_table_name: str, df: pd.DataFrame) -> pd.DataFrame:
column_list = tablename_mapping[xml_table_name][
"__class__"
Expand Down Expand Up @@ -185,6 +199,52 @@ def change_column_names_to_orm_format(
return df


def add_table_to_non_sqlite_database(
df: pd.DataFrame,
xml_table_name: str,
sql_table_name: str,
engine: sqlalchemy.engine.Engine,
) -> None:
# get a dictionary for the data types
table_columns_list = list(
tablename_mapping[xml_table_name]["__class__"].__table__.columns
)
dtypes_for_writing_sql = {
column.name: column.type
for column in table_columns_list
if column.name in df.columns
}

# Convert date and datetime columns into the datatype datetime.
df = cast_date_columns_to_datetime(xml_table_name, df)

add_missing_columns_to_table(
engine, xml_table_name, column_list=df.columns.tolist()
)

for _ in range(10000):
try:
with engine.connect() as con:
with con.begin():
df.to_sql(
sql_table_name,
con=con,
index=False,
if_exists="append",
dtype=dtypes_for_writing_sql,
)
break

except sqlalchemy.exc.DataError as err:
delete_wrong_xml_entry(err, df)

except sqlalchemy.exc.IntegrityError:
# error resulting from Unique constraint failed
df = write_single_entries_until_not_unique_comes_up(
df, xml_table_name, engine
)


def add_zero_as_first_character_for_too_short_string(df: pd.DataFrame) -> pd.DataFrame:
"""Some columns are read as integer even though they are actually strings starting with
a 0. This function converts those columns back to strings and adds a 0 as first character.
Expand Down Expand Up @@ -217,6 +277,46 @@ def add_zero_as_first_character_for_too_short_string(df: pd.DataFrame) -> pd.Dat
return df


def write_single_entries_until_not_unique_comes_up(
df: pd.DataFrame, xml_table_name: str, engine: sqlalchemy.engine.Engine
) -> pd.DataFrame:
"""
Remove from dataframe these rows, which are already existing in the database table
Parameters
----------
df
xml_table_name
engine
Returns
-------
Filtered dataframe
"""

table = tablename_mapping[xml_table_name]["__class__"].__table__
primary_key = next(c for c in table.columns if c.primary_key)

with engine.connect() as con:
with con.begin():
key_list = (
pd.read_sql(sql=select(primary_key), con=con).values.squeeze().tolist()
)

len_df_before = len(df)
df = df.drop_duplicates(
subset=[primary_key.name]
) # drop all entries with duplicated primary keys in the dataframe
df = df.set_index(primary_key.name)

df = df.drop(
labels=key_list, errors="ignore"
) # drop primary keys that already exist in the table
df = df.reset_index()
print(f"{len_df_before - len(df)} entries already existed in the database.")

return df


def add_missing_columns_to_table(
engine: sqlalchemy.engine.Engine,
xml_table_name: str,
Expand Down Expand Up @@ -356,91 +456,3 @@ def add_table_to_sqlite_database(
break
except sqlalchemy.exc.DataError as err:
delete_wrong_xml_entry(err, df)


def write_single_entries_until_not_unique_comes_up(
df: pd.DataFrame, xml_table_name: str, engine: sqlalchemy.engine.Engine
) -> pd.DataFrame:
"""
Remove from dataframe these rows, which are already existing in the database table
Parameters
----------
df
xml_table_name
engine
Returns
-------
Filtered dataframe
"""

table = tablename_mapping[xml_table_name]["__class__"].__table__
primary_key = next(c for c in table.columns if c.primary_key)

with engine.connect() as con:
with con.begin():
key_list = (
pd.read_sql(sql=select(primary_key), con=con).values.squeeze().tolist()
)

len_df_before = len(df)
df = df.drop_duplicates(
subset=[primary_key.name]
) # drop all entries with duplicated primary keys in the dataframe
df = df.set_index(primary_key.name)

df = df.drop(
labels=key_list, errors="ignore"
) # drop primary keys that already exist in the table
df = df.reset_index()
print(f"{len_df_before - len(df)} entries already existed in the database.")

return df


def add_table_to_non_sqlite_database(
df: pd.DataFrame,
xml_table_name: str,
sql_table_name: str,
engine: sqlalchemy.engine.Engine,
) -> None:
def add_table_to_database(
df: pd.DataFrame,
xml_table_name: str,
sql_table_name: str,
engine: sqlalchemy.engine.Engine,
) -> None:
# get a dictionary for the data types
table_columns_list = list(
tablename_mapping[xml_table_name]["__class__"].__table__.columns
)
dtypes_for_writing_sql = {
column.name: column.type
for column in table_columns_list
if column.name in df.columns
}

add_missing_columns_to_table(
engine, xml_table_name, column_list=df.columns.tolist()
)
for _ in range(10000):
try:
with engine.connect() as con:
with con.begin():
df.to_sql(
sql_table_name,
con=con,
index=False,
if_exists="append",
dtype=dtypes_for_writing_sql,
)
break

except sqlalchemy.exc.DataError as err:
delete_wrong_xml_entry(err, df)

except sqlalchemy.exc.IntegrityError:
# error resulting from Unique constraint failed
df = write_single_entries_until_not_unique_comes_up(
df, xml_table_name, engine
)
Loading

0 comments on commit 1117af7

Please sign in to comment.