Skip to content

Commit

Permalink
import data from environment bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
alexeh committed Dec 12, 2024
1 parent bcd30d4 commit fd8e2fb
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 27 deletions.
24 changes: 16 additions & 8 deletions data/base_data_importer/csv_to_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,24 @@
log = logging.getLogger("base_data_importer")
load_dotenv('../../.env')

db_host = os.getenv("API_POSTGRES_HOST")
db_port = os.getenv("API_POSTGRES_PORT")
db_user = os.getenv("API_POSTGRES_USERNAME")
db_database = os.getenv("API_POSTGRES_DATABASE")
db_password = os.getenv("API_POSTGRES_PASSWORD")
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
data_bucket_name = os.getenv("DATA_BUCKET_NAME")
path = 'import/base_data'


def load_csvs_into_tables(csv_file_list: list[dict]):
conn = psycopg2.connect(
host=os.getenv('API_POSTGRES_HOST'),
port=os.getenv('API_POSTGRES_PORT'),
database=os.getenv('API_POSTGRES_DATABASE'),
user=os.getenv('API_POSTGRES_USERNAME'),
password=os.getenv('API_POSTGRES_PASSWORD')
host=db_host,
port=db_port,
database=db_database,
user=db_user,
password=db_password
)
log.info('Loading base data CSVs into Database...')
cursor = conn.cursor()
Expand Down Expand Up @@ -67,13 +77,11 @@ def load_csvs_into_tables(csv_file_list: list[dict]):
def download_base_data(files_to_download: list[str]) -> list[str]:
downloaded_files = []
log.info(f"Downloading files base data files...")
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
try:
for file_name in files_to_download:
log.info(f"Downloading file: {file_name}")
s3.download_file(Bucket='landgriffon-raw-data', Key=f'import/base_data/{file_name}', Filename=file_name)
s3.download_file(Bucket=data_bucket_name, Key=f"{path}/{file_name}", Filename=file_name)
if os.path.exists(file_name):
downloaded_files.append(file_name)
else:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,32 @@
from dotenv import load_dotenv
import boto3

load_dotenv('../../.env')

import click
import pandas as pd
from psycopg2.extensions import connection
from psycopg2.pool import ThreadedConnectionPool

logging.basicConfig(level=logging.INFO)
log = logging.getLogger("indicator_coefficient_importer")
load_dotenv('../../.env')

db_host = os.getenv("API_POSTGRES_HOST")
db_port = os.getenv("API_POSTGRES_PORT")
db_user = os.getenv("API_POSTGRES_USERNAME")
db_database = os.getenv("API_POSTGRES_DATABASE")
db_password = os.getenv("API_POSTGRES_PASSWORD")
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
data_bucket_name = os.getenv("DATA_BUCKET_NAME")
path = 'import/indicator_coefficients'

postgres_thread_pool = ThreadedConnectionPool(
1,
50,
host=os.getenv("API_POSTGRES_HOST"),
port=os.getenv("API_POSTGRES_PORT"),
user=os.getenv("API_POSTGRES_USERNAME"),
password=os.getenv("API_POSTGRES_PASSWORD"),
host=db_host,
port=db_port,
database=db_database,
user=db_user,
password=db_password,
)


Expand Down Expand Up @@ -104,20 +113,16 @@ def copy_data_to_table(conn: connection, df: pd.DataFrame, indicator_id: str):

def download_indicator_coefficient_files(files_to_download: list[str]) -> list[str]:
log.info(f"Downloading files: {files_to_download}")
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
s3 = boto3.client('s3', aws_access_key_id=aws_access_key_id, aws_secret_access_key=aws_secret_access_key)
downloaded_files = []
print(files_to_download)
try:
for file in files_to_download:
file_name = file.split('/')[-1]
log.info(f"Downloading file: {file_name}")
s3.download_file(Bucket='landgriffon-raw-data', Key=file, Filename=file_name)
if os.path.exists(file_name):
downloaded_files.append(file_name)
log.info(f"Downloading file: {file}")
s3.download_file(Bucket=data_bucket_name, Key=f"{path}/{file}", Filename=file)
if os.path.exists(file):
downloaded_files.append(file)
else:
raise Exception(f"Error downloading file: {file_name}")
raise Exception(f"Error downloading file: {file}")
return downloaded_files
except Exception as e:
log.error(f"Error downloading files: {e}")
Expand All @@ -133,8 +138,8 @@ def download_indicator_coefficient_files(files_to_download: list[str]) -> list[s

def load_indicator_config() -> list[dict[str, Any]]:
try:
indicator_config_json = os.getenv('INDICATOR_COEFFICIENT_CONFIG')
print(indicator_config_json)
indicator_config_json = os.getenv("INDICATOR_COEFFICIENT_CONFIG")
logging.info(f"Loading indicator coefficient configuration: {indicator_config_json}")

if not indicator_config_json:
raise ValueError("Environment variable 'INDICATOR_COEFFICIENT_CONFIG' is missing or empty. Aborting.")
Expand Down Expand Up @@ -164,7 +169,7 @@ def main():

conn = postgres_thread_pool.getconn()
for indicator in indicator_config:
file = indicator['file'].split('/')[-1]
file = indicator['file']
year = indicator['year']
indicator_code = indicator['indicator_code']
data = load_data(filename=file, year=year)
Expand Down

0 comments on commit fd8e2fb

Please sign in to comment.