Skip to content

Commit

Permalink
fixes
Browse files Browse the repository at this point in the history
Signed-off-by: Raphaël Courivaud <[email protected]>
  • Loading branch information
rcourivaud committed Jan 9, 2025
1 parent d186b7e commit 0cc2b10
Show file tree
Hide file tree
Showing 60 changed files with 391 additions and 243 deletions.
10 changes: 9 additions & 1 deletion analytics/dagster/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
FROM python:3.10-slim

ARG ZLV_HTTP_USERNAME=zlv
ARG ZLV_HTTP_PASSWORD=zlv

ENV ZLV_HTTP_USERNAME=$ZLV_HTTP_USERNAME
ENV ZLV_HTTP_PASSWORD=$ZLV_HTTP_PASSWORD

COPY --from=clevercloud/clever-tools /bin/clever /usr/local/bin/clever

# Installer les dépendances nécessaires
RUN apt-get update && \
apt-get install -y nginx apache2-utils curl gnupg && \
Expand All @@ -26,7 +34,7 @@ COPY src/ $DAGSTER_HOME
WORKDIR $DAGSTER_HOME

# Configurer l'authentification basique pour Nginx
RUN htpasswd -cb /etc/nginx/.htpasswd zlv zlv
RUN htpasswd -cb /etc/nginx/.htpasswd $ZLV_HTTP_USERNAME $ZLV_HTTP_PASSWORD
COPY docker/nginx/nginx.conf /etc/nginx/sites-available/default

# Exposer le port 8080 pour le serveur web Nginx
Expand Down
5 changes: 3 additions & 2 deletions analytics/dagster/logs/event.log
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
{"action": "GRAPHQL_QUERY_COMPLETED", "client_time": "2025-01-07 08:53:49.876000", "event_id": "4afc1c44-982d-4bc6-a8b3-6ed6328ae0c4", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"client_id": "05fdde35-9a31-4bf4-a851-823d66198481", "operationName": "AssetEventsQuery", "elapsedTime": "100.89999997615814"}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.4-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "GRAPHQL_QUERY_COMPLETED", "client_time": "2025-01-07 09:00:25.837000", "event_id": "000727bc-50c7-4c04-a23c-15f2db6ce95a", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"client_id": "05fdde35-9a31-4bf4-a851-823d66198481", "operationName": "RunRootQuery", "elapsedTime": "220.5"}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.4-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "step_success_event", "client_time": "2025-01-09 11:22:22.838835", "event_id": "7c11e930-235c-47a5-b4f6-1d724c01d08e", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "331c48224eff1759788e40174b635f328555bb04b1a4bbd1e7eac1dd2cf39354", "step_key_hash": "5310d1bc5da6c8b581cace1e131e053834acfbe8a4647d922e7cf17ce2e6b60d", "duration_ms": 250506.41840499884}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "step_start_event", "client_time": "2025-01-09 11:23:34.230866", "event_id": "f1fee084-6840-47f8-9f95-384e99cfe72e", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "075dffc13840fd3bd82e6f9da3e227d2efba9d25b67d44fdf1c81067d0fca22b", "step_key_hash": "f6cd558066b27dfbfa37ee3b6b7f9b33e26320d98fd68382dc210944693ff806"}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
{"action": "step_success_event", "client_time": "2025-01-09 11:25:39.837169", "event_id": "ed9b910b-9508-4923-b945-8391067cad80", "elapsed_time": "None", "instance_id": "3535e14f-e2a7-4b35-a902-c2e428480e12", "metadata": {"run_id_hash": "075dffc13840fd3bd82e6f9da3e227d2efba9d25b67d44fdf1c81067d0fca22b", "step_key_hash": "f6cd558066b27dfbfa37ee3b6b7f9b33e26320d98fd68382dc210944693ff806", "duration_ms": 125560.43714100088}, "python_version": "3.10.16", "dagster_version": "1.9.6", "os_desc": "Linux-6.10.14-linuxkit-x86_64-with-glibc2.36", "os_platform": "Linux", "run_storage_id": "8cb599b9-bf33-4d0d-932b-b5fa375d93e0", "is_known_ci_env": false}
8 changes: 7 additions & 1 deletion analytics/dagster/src/assets/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# from .production_dlt import dagster_production_assets
from .production_dbt import dbt_production_assets
from .clever import clevercloud_login_and_restart
from .dwh import __all__

# __all__ = ["dagster_production_assets"]
__all__ = ["dbt_production_assets"]
__all__ = [
"dbt_production_assets",
"clevercloud_login_and_restart"
]

3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/clever/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .restart import clevercloud_login_and_restart

__all__ = ["clevercloud_login_and_restart"]
16 changes: 16 additions & 0 deletions analytics/dagster/src/assets/clever/restart.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import subprocess
from dagster import asset
from ...config import Config

@asset(group_name="deployment", deps=["upload_duckdb_to_s3"], name="clevercloud_login_and_restart")
def clevercloud_login_and_restart():

# Log in to Clever Cloud using environment variables
login_command = ["clever", "login", "--token", Config.CLEVER_TOKEN, "--secret", Config.CLEVER_SECRET]
subprocess.run(login_command, check=True)

# Restart the application
restart_command = ["clever", "restart", "--app", Config.METABASE_APP_ID]
subprocess.run(restart_command, check=True)

return f"Application {Config.METABASE_APP_ID} has been restarted."
19 changes: 8 additions & 11 deletions analytics/dagster/src/assets/dwh/__init__.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,16 @@
from .ingest.ingest_postgres_asset import (
import_postgres_data_from_replica_to_duckdb,
setup_replica_db,
)
from .ingest.ingest_lovac_ff_s3_asset import (
import_cerema_ff_lovac_data_from_s3_to_duckdb,
setup_s3_connection,
)
from .upload.upload_metabase_db_to_cellar import upload_duckdb_to_s3
from .copy.copy_to_clean_duckdb import copy_dagster_duckdb_to_metabase_duckdb
from .checks import check_ff_lovac_on_duckdb
from .copy import copy_dagster_duckdb_to_metabase_duckdb
from .ingest import import_postgres_data_from_replica_to_duckdb, import_cerema_ff_lovac_data_from_s3_to_duckdb, setup_replica_db, setup_s3_connection
from .upload import upload_duckdb_to_s3, upload_ff_to_s3, download_ff_from_s3

__all__ = [
"check_ff_lovac_on_duckdb",
"copy_dagster_duckdb_to_metabase_duckdb",
"import_postgres_data_from_replica_to_duckdb",
"import_cerema_ff_lovac_data_from_s3_to_duckdb",
"copy_dagster_duckdb_to_metabase_duckdb",
"setup_replica_db",
"setup_s3_connection",
"upload_duckdb_to_s3",
"upload_ff_to_s3",
"download_ff_from_s3",
]
3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/dwh/checks/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .ff_table_exists import check_ff_lovac_on_duckdb

__all__ = ["check_ff_lovac_on_duckdb"]
32 changes: 32 additions & 0 deletions analytics/dagster/src/assets/dwh/checks/ff_table_exists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from dagster import AssetSpec, multi_asset
from ..ingest.queries.lovac import lovac_tables_sql
from ..ingest.queries.ff import ff_tables_sql
from dagster_duckdb import DuckDBResource


# Asset for uploading the DuckDB metabase file to S3
all_tables_sql = {**lovac_tables_sql, **ff_tables_sql}

print([ f"check_{table_name}" for table_name in all_tables_sql.keys() ])


@multi_asset(
specs=[
AssetSpec(
f"check_{table_name}",
kinds={"duckdb"},
deps=[f"build_{table_name}"],
group_name="check",
)
for table_name in all_tables_sql.keys()
],
can_subset=True,
)
def check_ff_lovac_on_duckdb(duckdb: DuckDBResource):
query = "SELECT * FROM ff.lovac LIMIT 1;"
with duckdb.get_connection() as conn:
res = conn.execute(query)
if res.fetchdf().empty:
raise Exception("No data in ff.lovac table")
else:
return "Data found in ff.lovac table"
3 changes: 3 additions & 0 deletions analytics/dagster/src/assets/dwh/copy/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from .copy_to_clean_duckdb import copy_dagster_duckdb_to_metabase_duckdb

__all__ = ["copy_dagster_duckdb_to_metabase_duckdb"]
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from dagster import AssetKey
from ..config import RESULT_TABLES, translate_table_name
from ....config import RESULT_TABLES, translate_table_name
from dagster_duckdb import DuckDBResource
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset

Expand Down
9 changes: 9 additions & 0 deletions analytics/dagster/src/assets/dwh/ingest/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from .ingest_lovac_ff_s3_asset import import_cerema_ff_lovac_data_from_s3_to_duckdb, setup_s3_connection
from .ingest_postgres_asset import import_postgres_data_from_replica_to_duckdb, setup_replica_db

__all__ = [
"import_postgres_data_from_replica_to_duckdb",
"import_cerema_ff_lovac_data_from_s3_to_duckdb",
"setup_replica_db",
"setup_s3_connection",
]
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
from dagster import AssetKey, asset
from dagster_duckdb import DuckDBResource
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset
from ..config import Config
from ....config import Config
from .queries.lovac import lovac_tables_sql
from .queries.ff import ff_tables_sql


all_tables_sql = {**lovac_tables_sql, **ff_tables_sql}


@asset(name="setup_connection_s3", description="Load all tables into DuckDB")
@asset(name="setup_s3_connection", description="Load all tables into DuckDB")
def setup_s3_connection(context, duckdb: DuckDBResource):
query = f"""
CREATE OR REPLACE PERSISTENT SECRET SECRET (
Expand All @@ -23,7 +23,10 @@ def setup_s3_connection(context, duckdb: DuckDBResource):
with duckdb.get_connection() as conn:
context.log.info(f"Executing SQL: {query}")
conn.execute(query)
schema_query = "CREATE SCHEMA IF NOT EXISTS ff;"
schema_query = """
CREATE SCHEMA IF NOT EXISTS ff;
CREATE SCHEMA IF NOT EXISTS lovac;
"""
context.log.info(f"Executing SQL: {schema_query}")
conn.execute(schema_query)

Expand All @@ -40,7 +43,7 @@ def process_subset(name: str, context: AssetExecutionContext, duckdb: DuckDBReso
@multi_asset(
specs=[
AssetSpec(
f"raw_{name}",
f"build_{name}",
deps=["setup_s3_connection"],
kinds={"duckdb", "s3"},
group_name="lovac" if "lovac" in name else "ff",
Expand All @@ -55,9 +58,9 @@ def import_cerema_ff_lovac_data_from_s3_to_duckdb(
context.log.info("Importing data from replica to DuckDB")
context.log.info("duckdb: " + duckdb.__str__())
for name in all_tables_sql:
if AssetKey(f"raw_{name}") in context.op_execution_context.selected_asset_keys:
if AssetKey(f"build_{name}") in context.op_execution_context.selected_asset_keys:
context.log.info(f"Found {name} in selected_asset_keys")
process_subset(name, context, duckdb)
yield MaterializeResult(asset_key=f"raw_{name}")
yield MaterializeResult(asset_key=f"build_{name}")
else:
pass
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from dagster import AssetKey, asset
from dagster_duckdb import DuckDBResource
from dagster import AssetExecutionContext, AssetSpec, MaterializeResult, multi_asset
from ..config import Config
from ....config import Config
from .queries.production import production_tables


Expand Down
4 changes: 2 additions & 2 deletions analytics/dagster/src/assets/dwh/ingest/queries/ff.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from ...config import Config
from .....config import Config

SCHEMA = "ff"

ff_tables_sql = {
"raw_ff_2023": f"""
CREATE OR REPLACE TABLE {SCHEMA}.raw_ff_2023 AS (
SELECT * FROM read_csv('s3://{Config.CELLAR_DATA_LAKE_BUCKET_NAME}/ff/2023/raw.csv',
SELECT * FROM read_csv('s3://{Config.CELLAR_DATA_LAKE_BUCKET_NAME}/lake/ff/2023/raw.csv',
auto_detect = TRUE,
ignore_errors = false, types = {{'ff_ccogrm': 'VARCHAR',}}
)
Expand Down
4 changes: 2 additions & 2 deletions analytics/dagster/src/assets/dwh/ingest/queries/lovac.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ...config import Config
from .....config import Config

SCHEMA = "lovac"

Expand All @@ -21,7 +21,7 @@
'ff_ccogrm_6': 'VARCHAR',
}}));""",
"raw_lovac_2023": f"""
CREATE OR REPLACE TABLE {SCHEMA}.raw_lovac_2024 AS (
CREATE OR REPLACE TABLE {SCHEMA}.raw_lovac_2023 AS (
SELECT * FROM read_csv('s3://{Config.CELLAR_DATA_LAKE_BUCKET_NAME}/lake/lovac/2023/raw.csv', auto_detect = TRUE,
types = {{
'ff_jdatnss_6': 'VARCHAR',
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from ...config import Config
from .....config import Config


SCHEMA = "production"
Expand Down
2 changes: 1 addition & 1 deletion analytics/dagster/src/assets/dwh/setup_duckdb.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dagster import asset
from dagster_duckdb import DuckDBResource
from .config import Config
from ...config import Config


@asset(
Expand Down
5 changes: 5 additions & 0 deletions analytics/dagster/src/assets/dwh/upload/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
from .download_ff_db_from_cellar import download_ff_from_s3
from .upload_ff_db_to_cellar import upload_ff_to_s3
from .upload_metabase_db_to_cellar import upload_duckdb_to_s3

__all__ = ["upload_duckdb_to_s3", "upload_ff_to_s3", "download_ff_from_s3"]
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
from dagster import asset
from ....config import Config
from .utils import download

from dagster_duckdb import DuckDBResource

# Asset for uploading the DuckDB metabase file to S3
@asset(
name="download_ff_from_s3",
group_name="upload",
)
def download_ff_from_s3(duckdb: DuckDBResource):
s3_bucket = Config.CELLAR_DATA_LAKE_BUCKET_NAME
s3_key = Config.CELLAR_STATE_FF_LOVAC_KEY_PATH
file_path = duckdb.database # Path to the DuckDB metabase file

download(file_path, s3_bucket, s3_key)

return f"Uploaded DuckDB metabase to s3://{s3_bucket}/{s3_key}"
25 changes: 25 additions & 0 deletions analytics/dagster/src/assets/dwh/upload/upload_ff_db_to_cellar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
from dagster import AssetKey, asset
from ....config import RESULT_TABLES, Config
from .utils import upload
from ..ingest.queries.lovac import lovac_tables_sql
from ..ingest.queries.ff import ff_tables_sql
from dagster_duckdb import DuckDBResource
import boto3


# Asset for uploading the DuckDB metabase file to S3
all_tables_sql = {**lovac_tables_sql, **ff_tables_sql}

@asset(
name="upload_ff_to_s3",
deps={AssetKey(f"build_{table_name}") for table_name in all_tables_sql.keys()},
group_name="upload",
)
def upload_ff_to_s3(duckdb: DuckDBResource):
s3_bucket = Config.CELLAR_DATA_LAKE_BUCKET_NAME
s3_key = Config.CELLAR_STATE_FF_LOVAC_KEY_PATH
file_path = duckdb.database # Path to the DuckDB metabase file

upload(file_path, s3_bucket, s3_key)

return f"Uploaded DuckDB metabase to s3://{s3_bucket}/{s3_key}"
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from dagster import AssetKey, asset
from ..config import RESULT_TABLES, Config
from ....config import RESULT_TABLES, Config
from .utils import upload

from dagster_duckdb import DuckDBResource
import boto3


# Asset for uploading the DuckDB metabase file to S3
@asset(
name="upload_duckdb_to_s3",
deps={AssetKey(f"copy_{table_name}") for table_name in RESULT_TABLES},
group_name="upload",
)
Expand All @@ -14,24 +17,6 @@ def upload_duckdb_to_s3(duckdb_metabase: DuckDBResource):
s3_key = Config.CELLAR_METABASE_KEY_PATH
file_path = duckdb_metabase.database # Path to the DuckDB metabase file

print(f"Uploading DuckDB metabase to s3://{s3_bucket}/{s3_key}")
print("Credentials, endpoint and region:")

print(f"Access key ID: {Config.CELLAR_ACCESS_KEY_ID}")
print(f"Secret access key: {Config.CELLAR_SECRET_ACCESS_KEY}")
print(f"Endpoint URL: {Config.CELLAR_HTTP_HOST_URL}")

# Initialize S3 client
s3_client = boto3.client(
"s3",
region_name=Config.CELLAR_REGION,
aws_access_key_id=Config.CELLAR_ACCESS_KEY_ID,
aws_secret_access_key=Config.CELLAR_SECRET_ACCESS_KEY,
endpoint_url=Config.CELLAR_HTTP_HOST_URL,
)

# Upload the DuckDB metabase file to S3
with open(file_path, "rb") as f:
s3_client.upload_fileobj(f, s3_bucket, s3_key)
upload(file_path, s3_bucket, s3_key)

return f"Uploaded DuckDB metabase to s3://{s3_bucket}/{s3_key}"
45 changes: 45 additions & 0 deletions analytics/dagster/src/assets/dwh/upload/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
from ....config import Config
import boto3

def upload(source_file_path: str, s3_bucket: str, s3_key: str):
print(f"Uploading DuckDB metabase to s3://{s3_bucket}/{s3_key}")
print("Credentials, endpoint and region:")

print(f"Access key ID: {Config.CELLAR_ACCESS_KEY_ID}")
print(f"Secret access key: {Config.CELLAR_SECRET_ACCESS_KEY}")
print(f"Endpoint URL: {Config.CELLAR_HTTP_HOST_URL}")

# Initialize S3 client
s3_client = boto3.client(
"s3",
region_name=Config.CELLAR_REGION,
aws_access_key_id=Config.CELLAR_ACCESS_KEY_ID,
aws_secret_access_key=Config.CELLAR_SECRET_ACCESS_KEY,
endpoint_url=Config.CELLAR_HTTP_HOST_URL,
)

# Upload the DuckDB metabase file to S3
with open(source_file_path, "rb") as f:
s3_client.upload_fileobj(f, s3_bucket, s3_key)


def download(destination_file_path: str, s3_bucket: str, s3_key: str):
print(f"Downloading DuckDB metabase from s3://{s3_bucket}/{s3_key}")
print("Credentials, endpoint and region:")

print(f"Access key ID: {Config.CELLAR_ACCESS_KEY_ID}")
print(f"Secret access key: {Config.CELLAR_SECRET_ACCESS_KEY}")
print(f"Endpoint URL: {Config.CELLAR_HTTP_HOST_URL}")

# Initialize S3 client
s3_client = boto3.client(
"s3",
region_name=Config.CELLAR_REGION,
aws_access_key_id=Config.CELLAR_ACCESS_KEY_ID,
aws_secret_access_key=Config.CELLAR_SECRET_ACCESS_KEY,
endpoint_url=Config.CELLAR_HTTP_HOST_URL,
)

# Download the DuckDB metabase file from S3
with open(destination_file_path, "wb") as f:
s3_client.download_fileobj(s3_bucket, s3_key, f)
Loading

0 comments on commit 0cc2b10

Please sign in to comment.