Skip to content

Commit

Permalink
Merge branch 'main' into br_mec_sisu
Browse files Browse the repository at this point in the history
  • Loading branch information
laura-l-amaral authored May 10, 2024
2 parents 2cd8374 + ba7920d commit fcb7903
Show file tree
Hide file tree
Showing 281 changed files with 72,906 additions and 4,162 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/cd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -98,4 +98,4 @@ jobs:
run: poetry install --only=dev
- name: Run script for changing metadata status
run: |-
python .github/workflows/scripts/change_metadata_status.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --status published --email ${{ secrets.BACKEND_EMAIL }} --password ${{ secrets.BACKEND_PASSWORD }}
poetry run python .github/workflows/scripts/change_metadata_status.py --modified-files ${{ steps.changed-files.outputs.all_modified_files }} --graphql-url ${{ secrets.BACKEND_GRAPHQL_URL }} --status published --email ${{ secrets.BACKEND_EMAIL }} --password ${{ secrets.BACKEND_PASSWORD }}
34 changes: 34 additions & 0 deletions .github/workflows/check-bq-project-name.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
---
name: Check BQ project name
on:
workflow_dispatch:
pull_request:
paths: ['**/*.sql']
jobs:
check_bucket_name:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v2
- name: Get changed files
id: get_files
uses: dorny/paths-filter@v2
with:
list-files: shell
filters: |
pr:
- added|deleted|modified: '**'
- name: Install Python
uses: actions/setup-python@v2
with:
python-version: 3.x
- name: Run Python script
run: |-
for file in ${{ steps.get_files.outputs.pr_files }}; do
if [[ $file == *.sql ]]; then
echo "SQL file detected: $file"
python .github/workflows/scripts/check_sql_files.py $file
else
echo "Não é um arquivo SQL: $file"
fi
done
2 changes: 1 addition & 1 deletion .github/workflows/ci-dbt.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
name: CI dbt
on:
pull_request:
paths: ['**.sql', '**.yaml']
paths: ['**.sql', '**.yaml', '**.yml']
jobs:
lint:
name: Lint dbt
Expand Down
36 changes: 36 additions & 0 deletions .github/workflows/elementary.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
name: Deploy Elementary Report
on:
push:
branches: [main, master]
schedule:
- cron: 00 22 * * 1-5
workflow_dispatch:
jobs:
elementary:
runs-on: ubuntu-latest
steps:
- name: Checkout dbt project
uses: actions/checkout@v3
- name: Run Elementary
uses: elementary-data/[email protected]
with:
warehouse-type: bigquery
adapter-version: 1.5.9
profiles-yml: ${{ secrets.ELEMENTARY_PROFILES_YML }}
edr-command: edr report --file-path "report.html" && edr send-report --google-service-account-path
"/tmp/gcs_keyfile.json" --gcs-bucket-name "basedosdados" --update-bucket-website
"true"
bigquery-keyfile: ${{ secrets.BIGQUERY_KEYFILE }}
gcs-keyfile: ${{ secrets.GCS_KEYFILE }}
- name: Upload report
uses: actions/upload-artifact@v3
with:
name: report.html
path: report.html
- name: Upload log
if: always()
uses: actions/upload-artifact@v3
with:
name: edr.log
path: edr.log
24 changes: 24 additions & 0 deletions .github/workflows/scripts/check_sql_files.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import argparse
import os

def check_sql_files(file):
found_staging = False
if os.path.exists(file) and file.endswith(".sql"):
with open(file, "r") as f:
lines = f.readlines()
for line in lines:
if "basedosdados-dev" in line:
found_staging = True
print(f"Found 'basedosdados-dev' in {file}")
break
return found_staging

if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Check for 'basedosdados-dev' occurrences in SQL files.")
parser.add_argument("file", help="Path to the SQL file to check")
args = parser.parse_args()

if check_sql_files(args.file):
exit(1)
else:
print("No occurrences of 'basedosdados-staging' found in SQL files.")
20 changes: 10 additions & 10 deletions .github/workflows/scripts/table_approve.py
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ def push_table_to_bq(
Dataset(dataset_id).update(mode="prod")
delete_storage_path = file_path.replace("./downloaded_data/", "")
print(
f"DELETE HEADER FILE FROM basedosdados/staing/{dataset_id}_staging/{table_id}/{delete_storage_path}"
f"DELETE HEADER FILE FROM basedosdados/staging/{dataset_id}_staging/{table_id}/{delete_storage_path}"
)
st = Storage(dataset_id=dataset_id, table_id=table_id)
st.delete_file(filename=delete_storage_path, mode="staging")
Expand Down Expand Up @@ -146,27 +146,27 @@ def save_header_files(dataset_id, table_id):
print("Found blob: ", str(blob.name))
print("Renamed blob: ", blob_path)
break
### save table header in storage

print(f"DOWNLOAD HEADER FILE FROM basedosdados-dev.{dataset_id}_staging.{table_id}")
query = f"""
SELECT * FROM `basedosdados-dev.{dataset_id}_staging.{table_id}` LIMIT 1
"""
df = bd.read_sql(query, billing_project_id="basedosdados", from_file=True)
df = df.drop(columns=partitions)

file_name = blob_path.split("/")[-1]
file_type = file_name.split(".")[-1]

path = Path(blob_path.replace(f"/{file_name}", ""))
path.mkdir(parents=True, exist_ok=True)

### save table header in storage
if file_type == "csv":
print(f"DOWNLOAD HEADER FILE FROM basedosdados-dev.{dataset_id}_staging.{table_id}")
query = f"""
SELECT * FROM `basedosdados-dev.{dataset_id}_staging.{table_id}` LIMIT 1
"""
df = bd.read_sql(query, billing_project_id="basedosdados", from_file=True)
df = df.drop(columns=partitions)

file_path = f"./{path}/table_approve_temp_file_271828.csv"
df.to_csv(file_path, index=False)
elif file_type == "parquet":
file_path = f"./{path}/table_approve_temp_file_271828.parquet"
df.to_parquet(file_path)
blob.download_to_filename(file_path)
print("SAVE HEADER FILE: ", file_path)
return file_path

Expand Down
236 changes: 236 additions & 0 deletions .github/workflows/scripts/table_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
from argparse import ArgumentParser
from time import sleep
import re
from backend import Backend
from utils import expand_alls, get_datasets_tables_from_modified_files


def get_flow_run_state(flow_run_id: str, backend: Backend, auth_token: str):
query = """
query ($flow_run_id: uuid!) {
flow_run_by_pk (id: $flow_run_id) {
state
}
}
"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
return response["flow_run_by_pk"]["state"]

def get_flow_status_logs(flow_run_id: str, backend: Backend, auth_token: str):
query = """query ($flow_run_id: uuid!){
log(where:{
flow_run_id:{_eq:$flow_run_id},
message:{_like:"%Done.%"}}){
message
}
}"""
response = backend._execute_query(
query,
variables={"flow_run_id": flow_run_id},
headers={"Authorization": f"Bearer {auth_token}"},
)
print(response)
message = response['log']['message']
result = {}
result['pass'] = int(re.findall("PASS=\d+", message)[0].split('=')[1])
result['skip'] = int(re.findall("SKIP=\d+", message)[0].split('=')[1])
result['warn'] = int(re.findall("WARN=\d+", message)[0].split('=')[1])

return result


def get_materialization_flow_id(backend: Backend, auth_token: str):
query = """
query {
flow (where: {
name: {
_like: "BD template: Executa DBT model"
},
archived: {
_eq: false
},
project: {
name: {_eq: "main"}
}
}) {
id
}
}
"""
response = backend._execute_query(
query, headers={"Authorization": f"Bearer {auth_token}"}
)
return response["flow"][0]["id"]


if __name__ == "__main__":
# Start argument parser
arg_parser = ArgumentParser()

# Add GraphQL URL argument
arg_parser.add_argument(
"--graphql-url",
type=str,
required=True,
help="URL of the GraphQL endpoint.",
)

# Add list of modified files argument
arg_parser.add_argument(
"--modified-files",
type=str,
required=True,
help="List of modified files.",
)


# Add Prefect backend URL argument
arg_parser.add_argument(
"--prefect-backend-url",
type=str,
required=False,
default="https://prefect.basedosdados.org/api",
help="Prefect backend URL.",
)

# Add prefect base URL argument
arg_parser.add_argument(
"--prefect-base-url",
type=str,
required=False,
default="https://prefect.basedosdados.org",
help="Prefect base URL.",
)

# Add Prefect API token argument
arg_parser.add_argument(
"--prefect-backend-token",
type=str,
required=True,
help="Prefect backend token.",
)

# Add materialization mode argument
arg_parser.add_argument(
"--materialization-mode",
type=str,
required=False,
default="dev",
help="Materialization mode.",
)

# Add materialization label argument
arg_parser.add_argument(
"--materialization-label",
type=str,
required=False,
default="basedosdados-dev",
help="Materialization label.",
)

# Add dbt command label argument
arg_parser.add_argument(
"--dbt-command",
type=str,
required=False,
default = "test",
help="Materialization label.",
)

# Get arguments
args = arg_parser.parse_args()

# Get datasets and tables from modified files
modified_files = args.modified_files.split(",")
datasets_tables = get_datasets_tables_from_modified_files(
modified_files, show_details=True
)
# Split deleted datasets and tables
deleted_datasets_tables = []
existing_datasets_tables = []
for dataset_id, table_id, exists, alias in datasets_tables:
if exists:
existing_datasets_tables.append((dataset_id, table_id, alias))
else:
deleted_datasets_tables.append((dataset_id, table_id, alias))
# Expand `__all__` tables
backend = Backend(args.graphql_url)
expanded_existing_datasets_tables = []
for dataset_id, table_id, alias in existing_datasets_tables:
expanded_table_ids = expand_alls(dataset_id, table_id, backend)
for expanded_dataset_id, expanded_table_id in expanded_table_ids:
expanded_existing_datasets_tables.append(
(expanded_dataset_id, expanded_table_id, alias)
)
existing_datasets_tables = expanded_existing_datasets_tables

# Launch materialization flows
backend = Backend(args.prefect_backend_url)
flow_id = get_materialization_flow_id(backend, args.prefect_backend_token)
launched_flow_run_ids = []
for dataset_id, table_id, alias in existing_datasets_tables:
print(
f"Launching materialization flow for {dataset_id}.{table_id} (alias={alias})..."
)
parameters = {
"dataset_id": dataset_id,
"dbt_alias": alias,
"mode": args.materialization_mode,
"table_id": table_id,
"dbt_command": args.dbt_command
}

mutation = """
mutation ($flow_id: UUID, $parameters: JSON, $label: String!) {
create_flow_run (input: {
flow_id: $flow_id,
parameters: $parameters,
labels: [$label],
}) {
id
}
}
"""
variables = {
"flow_id": flow_id,
"parameters": parameters,
"label": args.materialization_label,
}

response = backend._execute_query(
mutation,
variables,
headers={"Authorization": f"Bearer {args.prefect_backend_token}"},
)

flow_run_id = response["create_flow_run"]["id"]
launched_flow_run_ids.append(flow_run_id)
flow_run_url = f"{args.prefect_base_url}/flow-run/{flow_run_id}"
print(f" - Materialization flow run launched: {flow_run_url}")

# Keep monitoring the launched flow runs until they are finished
for launched_flow_run_id in launched_flow_run_ids:
print(f"Monitoring flow run {launched_flow_run_id}...")
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
while flow_run_state not in ["Success", "Failed", "Cancelled"]:
sleep(5)
flow_run_state = get_flow_run_state(
flow_run_id=launched_flow_run_id,
backend=backend,
auth_token=args.prefect_backend_token,
)
if flow_run_state != "Success":
raise Exception(
f'Flow run {launched_flow_run_id} finished with state "{flow_run_state}". '
f"Check the logs at {args.prefect_base_url}/flow-run/{launched_flow_run_id}"
)
else:
print("Congrats! Everything seems fine!")
Loading

0 comments on commit fcb7903

Please sign in to comment.