Skip to content

Commit

Permalink
Merge pull request #5 from cape-ph/capepy
Browse files Browse the repository at this point in the history
refactor: use the new `capepy` library to simplify code
  • Loading branch information
mehalter authored Nov 8, 2024
2 parents 4a561fd + ce20f93 commit 85d33ee
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 57 deletions.
62 changes: 5 additions & 57 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,13 @@
"""ETL script Template."""

import sys
from pathlib import Path

import boto3 as boto3
from awsglue.context import GlueContext
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from capepy.aws.glue import EtlJob

# Initialize logging and context
spark_ctx = SparkContext()
glue_ctx = GlueContext(spark_ctx)
logger = glue_ctx.get_logger()

# Evaluate parameters
parameters = getResolvedOptions(
sys.argv,
[
"RAW_BUCKET_NAME",
"ALERT_OBJ_KEY",
"CLEAN_BUCKET_NAME",
],
)
raw_bucket = parameters["RAW_BUCKET_NAME"]
raw_key = parameters["ALERT_OBJ_KEY"]
clean_bucket = parameters["CLEAN_BUCKET_NAME"]

# Retrieve the raw file passed into the ETL script
# Fail nicely if there is an error and log it
s3_client = boto3.client("s3")
response = s3_client.get_object(Bucket=raw_bucket, Key=raw_key)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not get object {raw_key} from bucket "
f"{raw_bucket}. ETL Cannot continue."
)
logger.error(err)
raise Exception(err)

logger.info(f"Obtained object {raw_key} from bucket {raw_bucket}.")
etl_job = EtlJob()

# `raw` has the contents of the raw file passed into the script
raw = response.get("Body")
raw = etl_job.get_raw_file()

# TODO: Here you want to clean the contents of the `raw` variable
# and produce the "cleaned" content to the `cleaned` variable
Expand All @@ -52,24 +16,8 @@
# TODO: Specify the name of the new clean file
# We typically just want to replace the file extension with a new one
# Below is an example of this, update with the correct extension
clean_key = str(Path(raw_key).with_suffix(".csv"))
clean_key = str(Path(etl_job.parameters["OBJECT_KEY"]).with_suffix(".csv"))

# Put the new cleaned object into the clean bucket
if cleaned is not None:
response = s3_client.put_object(
Bucket=clean_bucket, Key=clean_key, Body=cleaned
)
status = response.get("ResponseMetadata", {}).get("HTTPStatusCode")

if status != 200:
err = (
f"ERROR - Could not write transformed data object {clean_key} "
f"to bucket {clean_bucket}. ETL Cannot continue."
)
logger.error(err)
raise Exception(err)

logger.info(
f"Transformed {raw_bucket}/{raw_key} and wrote result "
f"to {clean_bucket}/{clean_key}"
)
etl_job.write_clean_file(cleaned, clean_key=clean_key)
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
aws-glue-libs @ git+https://github.com/awslabs/aws-glue-libs@9d8293962e6ffc607e5dc328e246f40b24010fa8
boto3==1.34.103
capepy>=1.0.0,<2.0.0
pyspark==3.5.1

0 comments on commit 85d33ee

Please sign in to comment.