Skip to content

Commit

Permalink
Merge pull request #27 from TeoMeWhy/fix
Browse files Browse the repository at this point in the history
Fix
  • Loading branch information
TeoCalvo authored Jul 23, 2024
2 parents 1814a01 + 83577b2 commit e264748
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 106 deletions.
104 changes: 0 additions & 104 deletions src/gold/ingestao.ipynb

This file was deleted.

33 changes: 33 additions & 0 deletions src/gold/ingestao.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Databricks notebook source
# DBTITLE 1,IMPORTS
import tqdm
import sys
import datetime

sys.path.insert(0, "../lib")

import utils
import ingestors

# COMMAND ----------

# DBTITLE 1,SETUP
catalog = "gold"
schemaname = 'upsell'
tablename = dbutils.widgets.get("tablename")

start = dbutils.widgets.get("dt_start") # now
stop = dbutils.widgets.get("dt_stop") # now

if start == datetime.datetime.now().strftime('%Y-%m-%d'):
start = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d")


# COMMAND ----------

ingestor = ingestors.IngestorCubo(spark=spark,
catalog=catalog,
schemaname=schemaname,
tablename=tablename)

ingestor.backfill(start, stop)
10 changes: 8 additions & 2 deletions src/lib/ingestors.py
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,19 @@ def load(self, **kwargs):
return df

def save(self, df, dt_ref):
self.spark.sql(f"DELETE FROM {self.table} WHERE dtRef = '{dt_ref}'")

(df.write
.mode("overwrite")
.option("replaceWhere", f"dtRef = '{dt_ref}'")
.mode("append")
.saveAsTable(self.table))

def backfill(self, dt_start, dt_stop):
dates = utils.date_range(dt_start, dt_stop)

if not utils.table_exists(self.spark, self.catalog, self.schemaname, self.tablename):
df = self.load(dt_ref=dates.pop(0))
df.write.saveAsTable(self.table)

for dt in tqdm.tqdm(dates):
df = self.load(dt_ref=dt)
self.save(df=df, dt_ref=dt)

0 comments on commit e264748

Please sign in to comment.