From ce20f93e64b3d1a212e8197ddcc99d62eeed2028 Mon Sep 17 00:00:00 2001 From: Micah Halter Date: Fri, 8 Nov 2024 14:30:07 -0500 Subject: [PATCH] refactor: use the new `capepy` library to simplify code --- main.py | 62 ++++-------------------------------------------- requirements.txt | 1 + 2 files changed, 6 insertions(+), 57 deletions(-) diff --git a/main.py b/main.py index 475833b..ee7d4b9 100644 --- a/main.py +++ b/main.py @@ -1,49 +1,13 @@ """ETL script Template.""" -import sys from pathlib import Path -import boto3 as boto3 -from awsglue.context import GlueContext -from awsglue.utils import getResolvedOptions -from pyspark.context import SparkContext +from capepy.aws.glue import EtlJob -# Initialize logging and context -spark_ctx = SparkContext() -glue_ctx = GlueContext(spark_ctx) -logger = glue_ctx.get_logger() - -# Evaluate parameters -parameters = getResolvedOptions( - sys.argv, - [ - "RAW_BUCKET_NAME", - "ALERT_OBJ_KEY", - "CLEAN_BUCKET_NAME", - ], -) -raw_bucket = parameters["RAW_BUCKET_NAME"] -raw_key = parameters["ALERT_OBJ_KEY"] -clean_bucket = parameters["CLEAN_BUCKET_NAME"] - -# Retrieve the raw file passed into the ETL script -# Fail nicely if there is an error and log it -s3_client = boto3.client("s3") -response = s3_client.get_object(Bucket=raw_bucket, Key=raw_key) -status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") - -if status != 200: - err = ( - f"ERROR - Could not get object {raw_key} from bucket " - f"{raw_bucket}. ETL Cannot continue." - ) - logger.error(err) - raise Exception(err) - -logger.info(f"Obtained object {raw_key} from bucket {raw_bucket}.") +etl_job = EtlJob() # `raw` has the contents of the raw file passed into the script -raw = response.get("Body") +raw = etl_job.get_raw_file() # TODO: Here you want to clean the contents of the `raw` variable # and produce the "cleaned" content to the `cleaned` variable @@ -52,24 +16,8 @@ # TODO: Specify the name of the new clean file # We typically just want to replace the file extension with a new one # Below is an example of this, update with the correct extension -clean_key = str(Path(raw_key).with_suffix(".csv")) +clean_key = str(Path(etl_job.parameters["OBJECT_KEY"]).with_suffix(".csv")) # Put the new cleaned object into the clean bucket if cleaned is not None: - response = s3_client.put_object( - Bucket=clean_bucket, Key=clean_key, Body=cleaned - ) - status = response.get("ResponseMetadata", {}).get("HTTPStatusCode") - - if status != 200: - err = ( - f"ERROR - Could not write transformed data object {clean_key} " - f"to bucket {clean_bucket}. ETL Cannot continue." - ) - logger.error(err) - raise Exception(err) - - logger.info( - f"Transformed {raw_bucket}/{raw_key} and wrote result " - f"to {clean_bucket}/{clean_key}" - ) + etl_job.write_clean_file(cleaned, clean_key=clean_key) diff --git a/requirements.txt b/requirements.txt index b4921e9..91825bf 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,4 @@ aws-glue-libs @ git+https://github.com/awslabs/aws-glue-libs@9d8293962e6ffc607e5dc328e246f40b24010fa8 boto3==1.34.103 +capepy>=1.0.0,<2.0.0 pyspark==3.5.1