-
Notifications
You must be signed in to change notification settings - Fork 2
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #590 from nhsx/care-golive
Notebook connecting care golive
- Loading branch information
Showing
5 changed files
with
74 additions
and
296 deletions.
There are no files selected for viewing
111 changes: 0 additions & 111 deletions
111
...ng_care_golive_supplier_website/dbrks_connecting_care_golive_supplier_website_snapshot.py
This file was deleted.
Oops, something went wrong.
45 changes: 45 additions & 0 deletions
45
config/pipelines/direct-load/config_connecting_care_golive_website.json
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
{ | ||
"metadata": { | ||
"products": [ | ||
{ | ||
"product_name": "Connecting Care", | ||
"granularity": "", | ||
"topic": "Connecting care golive website", | ||
"metrics": [ | ||
{ | ||
"id": "", | ||
"full_name": "Connecting care golive website", | ||
"short_name": "Connecting care golive website" | ||
} | ||
] | ||
} | ||
] | ||
}, | ||
"pipeline": { | ||
"name": "trigger_Connecting_care_golive_website", | ||
"folder": "direct_load/", | ||
"ingestion": { | ||
"databricks_notebook": "/databricks/au-azure-databricks-cicd/direct-load/dbrks_Connecting_care_golive_website" | ||
}, | ||
"raw": { | ||
"snapshot_source_path": "land/nhsengland/manual_upload/timestamp/excel/connecting_care_golive_supplier_website/" | ||
}, | ||
"proc": { | ||
"sink_path": "proc/sources/connecting_care_golive_supplier_website/manual_upload/historical/", | ||
"sink_file": "connecting_care_golive_supplier_website.csv" | ||
}, | ||
"staging": [ | ||
{ | ||
"metric": "Connecting care golive website", | ||
"sink_table": "connecting_care_golive_supplier_website" | ||
} | ||
|
||
], | ||
"validation": [ | ||
{ | ||
"initial_validation": "/databricks/au-azure-databricks-cicd/validation/dbrks_connecting_care_golive_supplier_website/dbrks_connecting_care_golive_supplier_website", | ||
"final_validation": "/databricks/au-azure-databricks-cicd/validation/dbrks_count_rows_in_table" | ||
} | ||
] | ||
} | ||
} |
63 changes: 0 additions & 63 deletions
63
config/pipelines/nhsx-au-analytics/config_connecting_care_golive_website.json
This file was deleted.
Oops, something went wrong.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,30 +1,30 @@ | ||
# Databricks notebook source | ||
#!/usr/bin python3 | ||
|
||
# ------------------------------------------------------------------------- | ||
# -------------------------------------------------------------------------- | ||
# Copyright (c) 2021 NHS England and NHS Improvement. All rights reserved. | ||
# Licensed under the MIT License. See license.txt in the project root for | ||
# license information. | ||
# ------------------------------------------------------------------------- | ||
# -------------------------------------------------------------------------- | ||
|
||
""" | ||
FILE: dbrks_connecting_care_golive_supplier_website_raw.py | ||
FILE: dbrks_connecting_care_golive_website.py | ||
DESCRIPTION: | ||
Databricks notebook with code to run and check new raw | ||
data for the NHSX Analytics unit metrics within the connecting care dashbord | ||
Databricks notebook to for loading connecting care data for go live and websites | ||
USAGE: | ||
... | ||
CONTRIBUTORS: Everistus Oputa | ||
CONTRIBUTORS: Abdu Nuhu | ||
CONTACT: [email protected] | ||
CREATED: 24.June. 2024 | ||
CREATED: 25 June. 2024 | ||
VERSION: 0.0.1 | ||
""" | ||
|
||
# COMMAND ---------- | ||
|
||
# Install libs | ||
# ------------------------------------------------------------------------- | ||
%pip install geojson==2.5.* tabulate requests pandas pathlib azure-storage-file-datalake beautifulsoup4 numpy urllib3 lxml regex pyarrow==5.0.* xlrd openpyxl python-dateutil | ||
%pip install geojson==2.5.* tabulate requests pandas pathlib azure-storage-file-datalake beautifulsoup4 numpy urllib3 lxml regex pyarrow==5.0.* xlrd openpyxl python-dateutil fastparquet | ||
|
||
# COMMAND ---------- | ||
|
||
|
@@ -36,6 +36,7 @@ | |
import tempfile | ||
from datetime import datetime | ||
import json | ||
import fastparquet | ||
|
||
# 3rd party: | ||
import pandas as pd | ||
|
@@ -60,7 +61,7 @@ | |
|
||
# Load JSON config from Azure datalake | ||
# ------------------------------------------------------------------------- | ||
file_path_config = "/config/pipelines/nhsx-au-analytics/" | ||
file_path_config = "/config/pipelines/direct-load/" | ||
file_name_config = "config_connecting_care_golive_website.json" | ||
file_system_config = dbutils.secrets.get(scope='AzureDataLake', key="DATALAKE_CONTAINER_NAME") | ||
config_JSON = datalake_download(CONNECTION_STRING, file_system_config, file_path_config, file_name_config) | ||
|
@@ -71,55 +72,43 @@ | |
# Read parameters from JSON config | ||
# ------------------------------------------------------------------------- | ||
file_system = dbutils.secrets.get(scope='AzureDataLake', key="DATALAKE_CONTAINER_NAME") | ||
new_source_path = config_JSON['pipeline']['raw']['source_path'] | ||
historical_source_path = config_JSON['pipeline']['raw']['appended_path'] | ||
historical_source_file = config_JSON['pipeline']['raw']['appended_file'] | ||
sink_path = config_JSON['pipeline']['raw']['appended_path'] | ||
sink_file = config_JSON['pipeline']['raw']['appended_file'] | ||
new_source_path = config_JSON['pipeline']['raw']['snapshot_source_path'] | ||
sink_path = config_JSON['pipeline']['proc']['sink_path'] | ||
sink_file = config_JSON['pipeline']['proc']['sink_file'] | ||
table_name = config_JSON['pipeline']["staging"][0]['sink_table'] | ||
|
||
print(new_source_path) | ||
print(sink_path) | ||
print(sink_file) | ||
print(table_name) | ||
|
||
|
||
# COMMAND ---------- | ||
|
||
# Pull new dataset | ||
# ------------------------- | ||
latestFolder = datalake_latestFolder(CONNECTION_STRING, file_system, new_source_path) | ||
print(new_source_path) | ||
print(latestFolder) | ||
file_name_list = datalake_listContents(CONNECTION_STRING, file_system, new_source_path+latestFolder) | ||
file_name_list = [file for file in file_name_list if '.xlsx' in file] | ||
for new_source_file in file_name_list: | ||
new_dataset = datalake_download(CONNECTION_STRING, file_system, new_source_path+latestFolder, new_source_file) | ||
new_dataframe = pd.read_excel(io.BytesIO(new_dataset), engine = 'openpyxl') | ||
new_dataframe['Date']= pd.to_datetime(new_dataframe['Date']).dt.strftime('%Y-%m-%d') | ||
new_dataframe = new_dataframe.loc[:, ~new_dataframe.columns.str.contains('^Unnamed')] | ||
new_dataframe.columns = new_dataframe.columns.str.rstrip() | ||
new_dataframe = pd.read_excel(io.BytesIO(new_dataset), sheet_name = "Summary", header = 0, engine='openpyxl') | ||
|
||
|
||
# COMMAND ---------- | ||
|
||
# Pull historical dataset | ||
# ----------------------------------------------------------------------- | ||
latestFolder = datalake_latestFolder(CONNECTION_STRING, file_system, historical_source_path) | ||
historical_dataset = datalake_download(CONNECTION_STRING, file_system, historical_source_path+latestFolder, historical_source_file) | ||
# historical_dataframe = pd.read_csv(io.BytesIO(historical_dataset)) | ||
historical_dataframe = pd.read_csv(io.BytesIO(historical_dataset)) | ||
historical_dataframe['Date'] = pd.to_datetime(historical_dataframe['Date']).dt.strftime('%Y-%m-%d') | ||
|
||
# Append new data to historical data | ||
#Upload hsitorical appended data to datalake | ||
# ----------------------------------------------------------------------- | ||
dates_in_historical = historical_dataframe["Date"].unique().tolist() | ||
dates_in_new = new_dataframe["Date"].unique().tolist()[-1] | ||
if dates_in_new in dates_in_historical: | ||
print('Data already exists in historical data') | ||
else: | ||
historical_dataframe = new_dataframe | ||
historical_dataframe = historical_dataframe.sort_values(by=['Date']) | ||
historical_dataframe = historical_dataframe.reset_index(drop=True) | ||
|
||
# COMMAND ---------- | ||
|
||
# Upload hsitorical appended data to datalake | ||
current_date_path = datetime.now().strftime('%Y-%m-%d') + '/' | ||
file_contents = io.BytesIO() | ||
historical_dataframe.to_csv(file_contents, index=False) | ||
df_processed.to_csv(file_contents) | ||
datalake_upload(file_contents, CONNECTION_STRING, file_system, sink_path+current_date_path, sink_file) | ||
|
||
# COMMAND ---------- | ||
|
||
# Write metrics to database | ||
# ------------------------------------------------------------------------- | ||
write_to_sql(df_processed, table_name, "overwrite") | ||
|
Oops, something went wrong.