Skip to content

Commit

Permalink
Setup DWH Synchronization (#1071)
Browse files Browse the repository at this point in the history
* setup dwh synchronization jobs from S3 for ff lovac and from postgres for production

Signed-off-by: Raphaël Courivaud <[email protected]>

* remove extra logs

Signed-off-by: Raphaël Courivaud <[email protected]>

* update

Signed-off-by: Raphaël Courivaud <[email protected]>

* add sql and python linter

Signed-off-by: Raphaël Courivaud <[email protected]>

* change schemas for source in dbt models

Signed-off-by: Raphaël Courivaud <[email protected]>

* fixes

Signed-off-by: Raphaël Courivaud <[email protected]>

---------

Signed-off-by: Raphaël Courivaud <[email protected]>
  • Loading branch information
rcourivaud authored Jan 9, 2025
1 parent 6278304 commit cc0c96d
Show file tree
Hide file tree
Showing 282 changed files with 4,920 additions and 3,426 deletions.
7 changes: 7 additions & 0 deletions analytics/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
*.duckdb
.duckdb
*.db
.ipynb_checkpoints/
*.csv
.tmp
.log
1 change: 1 addition & 0 deletions analytics/dagster/.telemetry/id.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
instance_id: 3535e14f-e2a7-4b35-a902-c2e428480e12
36 changes: 24 additions & 12 deletions analytics/dagster/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,32 +1,44 @@
FROM python:3.10-slim

RUN pip install dagster-webserver dagster-postgres dagster-aws
ARG ZLV_HTTP_USERNAME=zlv
ARG ZLV_HTTP_PASSWORD=zlv

RUN apt-get update && apt-get install -y nginx apache2-utils
ENV ZLV_HTTP_USERNAME=$ZLV_HTTP_USERNAME
ENV ZLV_HTTP_PASSWORD=$ZLV_HTTP_PASSWORD

RUN ls
# Install dependencies
COPY requirements.txt .
COPY --from=clevercloud/clever-tools /bin/clever /usr/local/bin/clever

# Installer les dépendances nécessaires
RUN apt-get update && \
apt-get install -y nginx apache2-utils curl gnupg && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*

# Installer Clever Tools
RUN curl -sL https://clever-tools.clever-cloud.com/releases/latest/clever-tools-latest_linux.tar.gz | tar xz -C /usr/local/bin

# Installer les packages Python requis
RUN pip install dagster-webserver dagster-postgres dagster-aws

# Copier les fichiers de configuration et le code source
COPY requirements.txt .
RUN pip install -r requirements.txt

# Définir la variable d'environnement pour Dagster
ENV DAGSTER_HOME=/opt/dagster/dagster_home/

RUN mkdir -p $DAGSTER_HOME

COPY dagster.yaml workspace.yaml $DAGSTER_HOME

COPY src/ $DAGSTER_HOME

# Définir le répertoire de travail
WORKDIR $DAGSTER_HOME

# Setup Nginx configuration
RUN htpasswd -cb /etc/nginx/.htpasswd zlv zlv
# Configurer l'authentification basique pour Nginx
RUN htpasswd -cb /etc/nginx/.htpasswd $ZLV_HTTP_USERNAME $ZLV_HTTP_PASSWORD
COPY docker/nginx/nginx.conf /etc/nginx/sites-available/default

# Expose port 80 for the Nginx web server
# Exposer le port 8080 pour le serveur web Nginx
EXPOSE 8080

# Définir la commande de démarrage
CMD nginx -g 'daemon off;' & dagster-webserver -h 0.0.0.0 -p 3000

32 changes: 17 additions & 15 deletions analytics/dagster/dagster.yaml
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
# storage:
# postgres:
# postgres_db:
# username:
# env: DAGSTER_PG_USERNAME
# password:
# env: DAGSTER_PG_PASSWORD
# hostname:
# env: DAGSTER_PG_HOST
# db_name:
# env: DAGSTER_PG_DB
# port:
# env: DAGSTER_PG_PORT
storage:
postgres:
postgres_db:
username:
env: DAGSTER_PG_USERNAME
password:
env: DAGSTER_PG_PASSWORD
hostname:
env: DAGSTER_PG_HOST
db_name:
env: DAGSTER_PG_DB
port: 5432


telemetry:
enabled: false
enabled: true

python_logs:
python_log_level: INFO
managed_python_loggers:
- my_logger
- my_logger

3 changes: 3 additions & 0 deletions analytics/dagster/logs/event.log
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{"action": "step_success_event", "client_time": "2025-01-09 11:22:22.838835", "event_id": "7c11e930-235c-47a5-b4f6-1d724c01d08e", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "331c48224eff1759788e40174b635f328555bb04b1a4bbd1e7eac1dd2cf39354", "step_key_hash": "5310d1bc5da6c8b581cace1e131e053834acfbe8a4647d922e7cf17ce2e6b60d", "duration_ms": 250506.41840499884}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "step_start_event", "client_time": "2025-01-09 11:23:34.230866", "event_id": "f1fee084-6840-47f8-9f95-384e99cfe72e", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "075dffc13840fd3bd82e6f9da3e227d2efba9d25b67d44fdf1c81067d0fca22b", "step_key_hash": "f6cd558066b27dfbfa37ee3b6b7f9b33e26320d98fd68382dc210944693ff806"}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "step_success_event", "client_time": "2025-01-09 11:25:39.837169", "event_id": "ed9b910b-9508-4923-b945-8391067cad80", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "075dffc13840fd3bd82e6f9da3e227d2efba9d25b67d44fdf1c81067d0fca22b", "step_key_hash": "f6cd558066b27dfbfa37ee3b6b7f9b33e26320d98fd68382dc210944693ff806", "duration_ms": 125560.43714100088}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
12 changes: 11 additions & 1 deletion analytics/dagster/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,11 +1,21 @@
dagster<w
dagster
dagster-dbt
dagster-duckdb
dagster-embedded-elt
GeoAlchemy2==0.16.0
dbt-duckdb
duckdb
matplotlib
pandas
requests
dlt[duckdb]
dlt[filesystem]
dlt[parquet]
s3fs

thefuzz==0.22.1





2 changes: 1 addition & 1 deletion analytics/dagster/src/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .definitions import defs as defs
from .definitions import defs as defs
12 changes: 10 additions & 2 deletions analytics/dagster/src/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
from .production_dlt import dagster_production_assets
# from .production_dlt import dagster_production_assets
from .production_dbt import dbt_production_assets
from .clever import clevercloud_login_and_restart
from .dwh import __all__

__all__ = ["dagster_production_assets"]
# __all__ = ["dagster_production_assets"]
__all__ = [
"dbt_production_assets",
"clevercloud_login_and_restart"
]

3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/clever/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .restart import clevercloud_login_and_restart

__all__ = ["clevercloud_login_and_restart"]
16 changes: 16 additions & 0 deletions analytics/dagster/src/assets/clever/restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import subprocess
from dagster import asset
from ...config import Config

@asset(group_name="deployment", deps=["upload_duckdb_to_s3"], name="clevercloud_login_and_restart")
def clevercloud_login_and_restart():

# Log in to Clever Cloud using environment variables
login_command = ["clever", "login", "--token", Config.CLEVER_TOKEN, "--secret", Config.CLEVER_SECRET]
subprocess.run(login_command, check=True)

# Restart the application
restart_command = ["clever", "restart", "--app", Config.METABASE_APP_ID]
subprocess.run(restart_command, check=True)

return f"Application {Config.METABASE_APP_ID} has been restarted."
16 changes: 16 additions & 0 deletions analytics/dagster/src/assets/dwh/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from .checks import check_ff_lovac_on_duckdb
from .copy import copy_dagster_duckdb_to_metabase_duckdb
from .ingest import import_postgres_data_from_replica_to_duckdb, import_cerema_ff_lovac_data_from_s3_to_duckdb, setup_replica_db, setup_s3_connection
from .upload import upload_duckdb_to_s3, upload_ff_to_s3, download_ff_from_s3

__all__ = [
"check_ff_lovac_on_duckdb",
"copy_dagster_duckdb_to_metabase_duckdb",
"import_postgres_data_from_replica_to_duckdb",
"import_cerema_ff_lovac_data_from_s3_to_duckdb",
"setup_replica_db",
"setup_s3_connection",
"upload_duckdb_to_s3",
"upload_ff_to_s3",
"download_ff_from_s3",
]
3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/dwh/checks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .ff_table_exists import check_ff_lovac_on_duckdb

__all__ = ["check_ff_lovac_on_duckdb"]
32 changes: 32 additions & 0 deletions analytics/dagster/src/assets/dwh/checks/ff_table_exists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dagster import AssetSpec, multi_asset
from ..ingest.queries.lovac import lovac_tables_sql
from ..ingest.queries.ff import ff_tables_sql
from dagster_duckdb import DuckDBResource


# Asset for uploading the DuckDB metabase file to S3
all_tables_sql = {**lovac_tables_sql, **ff_tables_sql}

print([ f"check_{table_name}" for table_name in all_tables_sql.keys() ])


@multi_asset(
specs=[
AssetSpec(
f"check_{table_name}",
kinds={"duckdb"},
deps=[f"build_{table_name}"],
group_name="check",
)
for table_name in all_tables_sql.keys()
],
can_subset=True,
)
def check_ff_lovac_on_duckdb(duckdb: DuckDBResource):
query = "SELECT * FROM ff.lovac LIMIT 1;"
with duckdb.get_connection() as conn:
res = conn.execute(query)
if res.fetchdf().empty:
raise Exception("No data in ff.lovac table")
else:
return "Data found in ff.lovac table"
3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/dwh/copy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .copy_to_clean_duckdb import copy_dagster_duckdb_to_metabase_duckdb

__all__ = ["copy_dagster_duckdb_to_metabase_duckdb"]
62 changes: 62 additions & 0 deletions analytics/dagster/src/assets/dwh/copy/copy_to_clean_duckdb.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
from dagster import AssetKey
from ....config import RESULT_TABLES, translate_table_name
from dagster_duckdb import DuckDBResource
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset


source_schema = "main_marts"
destination_schema = "main"


def process_specific_table(
table_name: str, duckdb: DuckDBResource, duckdb_metabase: DuckDBResource
):
chemin_destination_db = (
duckdb_metabase.database
) # Assurez-vous que ce chemin est correct

source_table_name = table_name
destination_table_name = translate_table_name(table_name)
with duckdb.get_connection() as conn:
conn.execute(f"ATTACH '{chemin_destination_db}' AS destination_db;")
conn.execute(
f"""
CREATE OR REPLACE TABLE destination_db.{destination_schema}.{destination_table_name} AS
SELECT * FROM {source_schema}.{source_table_name};
"""
)

# Detach the source database
# conn.execute("DETACH DATABASE source_db;")
conn.execute("DETACH DATABASE destination_db;")


@multi_asset(
specs=[
AssetSpec(
f"copy_{table_name}",
kinds={"duckdb"},
deps=[AssetKey(["marts", table_name])],
group_name="copy",
)
for table_name in RESULT_TABLES
],
can_subset=True,
)
def copy_dagster_duckdb_to_metabase_duckdb(
context: AssetExecutionContext,
duckdb: DuckDBResource,
duckdb_metabase: DuckDBResource,
):

for table_name in RESULT_TABLES:
if (
AssetKey(f"copy_{table_name}")
in context.op_execution_context.selected_asset_keys
):
process_specific_table(
table_name=table_name, duckdb=duckdb, duckdb_metabase=duckdb_metabase
)
yield MaterializeResult(asset_key=f"copy_{table_name}")
else:
pass
9 changes: 9 additions & 0 deletions analytics/dagster/src/assets/dwh/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .ingest_lovac_ff_s3_asset import import_cerema_ff_lovac_data_from_s3_to_duckdb, setup_s3_connection
from .ingest_postgres_asset import import_postgres_data_from_replica_to_duckdb, setup_replica_db

__all__ = [
"import_postgres_data_from_replica_to_duckdb",
"import_cerema_ff_lovac_data_from_s3_to_duckdb",
"setup_replica_db",
"setup_s3_connection",
]
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
from dagster import AssetKey, asset
from dagster_duckdb import DuckDBResource
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset
from ....config import Config
from .queries.lovac import lovac_tables_sql
from .queries.ff import ff_tables_sql


all_tables_sql = {**lovac_tables_sql, **ff_tables_sql}


@asset(name="setup_s3_connection", description="Load all tables into DuckDB")
def setup_s3_connection(context, duckdb: DuckDBResource):
query = f"""
CREATE OR REPLACE PERSISTENT SECRET SECRET (
TYPE S3,
KEY_ID '{Config.CELLAR_ACCESS_KEY_ID}',
SECRET '{Config.CELLAR_SECRET_ACCESS_KEY}',
ENDPOINT '{Config.CELLAR_HOST_URL}',
REGION '{Config.CELLAR_REGION}'
);
"""
with duckdb.get_connection() as conn:
context.log.info(f"Executing SQL: {query}")
conn.execute(query)
schema_query = """
CREATE SCHEMA IF NOT EXISTS ff;
CREATE SCHEMA IF NOT EXISTS lovac;
"""
context.log.info(f"Executing SQL: {schema_query}")
conn.execute(schema_query)


def process_subset(name: str, context: AssetExecutionContext, duckdb: DuckDBResource):
with duckdb.get_connection() as conn:

command = all_tables_sql[name]
context.log.info(f"Executing SQL: {command}")
res = conn.execute(command)
context.log.info(f"Result: {res.fetchdf()}")


@multi_asset(
specs=[
AssetSpec(
f"build_{name}",
deps=["setup_s3_connection"],
kinds={"duckdb", "s3"},
group_name="lovac" if "lovac" in name else "ff",
)
for name in all_tables_sql
],
can_subset=True,
)
def import_cerema_ff_lovac_data_from_s3_to_duckdb(
context: AssetExecutionContext, duckdb: DuckDBResource
):
context.log.info("Importing data from replica to DuckDB")
context.log.info("duckdb: " + duckdb.__str__())
for name in all_tables_sql:
if AssetKey(f"build_{name}") in context.op_execution_context.selected_asset_keys:
context.log.info(f"Found {name} in selected_asset_keys")
process_subset(name, context, duckdb)
yield MaterializeResult(asset_key=f"build_{name}")
else:
pass
Loading

0 comments on commit cc0c96d

Please sign in to comment.