From bea64c605b0c75b9820b963797c305ec45b2c48a Mon Sep 17 00:00:00 2001 From: TeoCalvo Date: Tue, 23 Jul 2024 12:41:05 +0000 Subject: [PATCH 1/2] Fix: remove replaceWhere and add DELETE FROM --- src/gold/ingestao.ipynb | 9 +++++---- src/lib/ingestors.py | 10 ++++++++-- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/src/gold/ingestao.ipynb b/src/gold/ingestao.ipynb index 2f3a96e..3d77cdd 100644 --- a/src/gold/ingestao.ipynb +++ b/src/gold/ingestao.ipynb @@ -17,8 +17,6 @@ }, "outputs": [], "source": [ - "# dbutils.library.restartPython()\n", - "\n", "import tqdm\n", "import sys\n", "import datetime\n", @@ -34,7 +32,10 @@ "execution_count": 0, "metadata": { "application/vnd.databricks.v1+cell": { - "cellMetadata": {}, + "cellMetadata": { + "byteLimit": 2048000, + "rowLimit": 10000 + }, "inputWidgets": {}, "nuid": "77e72b06-4150-4803-b6ae-be2b8864683c", "showTitle": true, @@ -88,7 +89,7 @@ "language": "python", "notebookMetadata": { "mostRecentlyExecutedCommandWithImplicitDF": { - "commandId": -1, + "commandId": 1263580614924770, "dataframes": [ "_sqldf" ] diff --git a/src/lib/ingestors.py b/src/lib/ingestors.py index 9517d11..608ad68 100644 --- a/src/lib/ingestors.py +++ b/src/lib/ingestors.py @@ -163,13 +163,19 @@ def load(self, **kwargs): return df def save(self, df, dt_ref): + self.spark.sql(f"DELETE FROM {self.table} WHERE dtRef = '{dt_ref}'") + (df.write - .mode("overwrite") - .option("replaceWhere", f"dtRef = '{dt_ref}'") + .mode("append") .saveAsTable(self.table)) def backfill(self, dt_start, dt_stop): dates = utils.date_range(dt_start, dt_stop) + + if not utils.table_exists(self.spark, self.catalog, self.schemaname, self.tablename): + df = self.load(dt_ref=dates.pop(0)) + df.write.saveAsTable(self.table) + for dt in tqdm.tqdm(dates): df = self.load(dt_ref=dt) self.save(df=df, dt_ref=dt) From 83577b2773d943909857d7cc6bf940f9103dffb5 Mon Sep 17 00:00:00 2001 From: TeoCalvo Date: Tue, 23 Jul 2024 12:43:01 +0000 Subject: [PATCH 2/2] Change notebook format --- src/gold/ingestao.ipynb | 105 ---------------------------------------- src/gold/ingestao.py | 33 +++++++++++++ 2 files changed, 33 insertions(+), 105 deletions(-) delete mode 100644 src/gold/ingestao.ipynb create mode 100644 src/gold/ingestao.py diff --git a/src/gold/ingestao.ipynb b/src/gold/ingestao.ipynb deleted file mode 100644 index 3d77cdd..0000000 --- a/src/gold/ingestao.ipynb +++ /dev/null @@ -1,105 +0,0 @@ -{ - "cells": [ - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, - "inputWidgets": {}, - "nuid": "08d32573-1e2c-464a-a7d2-691777fdb326", - "showTitle": true, - "title": "IMPORTS" - } - }, - "outputs": [], - "source": [ - "import tqdm\n", - "import sys\n", - "import datetime\n", - "\n", - "sys.path.insert(0, \"../lib\")\n", - "\n", - "import utils\n", - "import ingestors" - ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "rowLimit": 10000 - }, - "inputWidgets": {}, - "nuid": "77e72b06-4150-4803-b6ae-be2b8864683c", - "showTitle": true, - "title": "SETUP" - } - }, - "outputs": [], - "source": [ - "catalog = \"gold\"\n", - "schemaname = 'upsell'\n", - "tablename = dbutils.widgets.get(\"tablename\")\n", - "\n", - "start = dbutils.widgets.get(\"dt_start\") # now\n", - "stop = dbutils.widgets.get(\"dt_stop\") # now\n", - "\n", - "if start == datetime.datetime.now().strftime('%Y-%m-%d'):\n", - " start = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime(\"%Y-%m-%d\")\n" - ] - }, - { - "cell_type": "code", - "execution_count": 0, - "metadata": { - "application/vnd.databricks.v1+cell": { - "cellMetadata": { - "byteLimit": 2048000, - "implicitDf": true, - "rowLimit": 10000 - }, - "inputWidgets": {}, - "nuid": "f6975fe5-34e3-4b80-93fa-5c43ac282245", - "showTitle": false, - "title": "" - } - }, - "outputs": [], - "source": [ - "ingestor = ingestors.IngestorCubo(spark=spark,\n", - " catalog=catalog,\n", - " schemaname=schemaname,\n", - " tablename=tablename)\n", - "\n", - "ingestor.backfill(start, stop)" - ] - } - ], - "metadata": { - "application/vnd.databricks.v1+notebook": { - "dashboards": [], - "environmentMetadata": null, - "language": "python", - "notebookMetadata": { - "mostRecentlyExecutedCommandWithImplicitDF": { - "commandId": 1263580614924770, - "dataframes": [ - "_sqldf" - ] - }, - "pythonIndentUnit": 4 - }, - "notebookName": "ingestao", - "widgets": {} - } - }, - "nbformat": 4, - "nbformat_minor": 0 -} diff --git a/src/gold/ingestao.py b/src/gold/ingestao.py new file mode 100644 index 0000000..89d5f2f --- /dev/null +++ b/src/gold/ingestao.py @@ -0,0 +1,33 @@ +# Databricks notebook source +# DBTITLE 1,IMPORTS +import tqdm +import sys +import datetime + +sys.path.insert(0, "../lib") + +import utils +import ingestors + +# COMMAND ---------- + +# DBTITLE 1,SETUP +catalog = "gold" +schemaname = 'upsell' +tablename = dbutils.widgets.get("tablename") + +start = dbutils.widgets.get("dt_start") # now +stop = dbutils.widgets.get("dt_stop") # now + +if start == datetime.datetime.now().strftime('%Y-%m-%d'): + start = (datetime.datetime.now() - datetime.timedelta(days=1)).strftime("%Y-%m-%d") + + +# COMMAND ---------- + +ingestor = ingestors.IngestorCubo(spark=spark, + catalog=catalog, + schemaname=schemaname, + tablename=tablename) + +ingestor.backfill(start, stop)