diff --git a/.github/workflows/quality-check-dagster-teradata.yml b/.github/workflows/quality-check-dagster-teradata.yml new file mode 100644 index 0000000..7e97610 --- /dev/null +++ b/.github/workflows/quality-check-dagster-teradata.yml @@ -0,0 +1,12 @@ +name: quality-check-dagster-teradata +on: + pull_request: + types: [opened, synchronize, reopened] + paths: + - 'libraries/dagster-teradata/**' + +jobs: + check: + uses: ./.github/workflows/template-quality-check.yml + with: + working_directory: ./libraries/dagster-teradata diff --git a/.github/workflows/release-dagster-teradata.yml b/.github/workflows/release-dagster-teradata.yml new file mode 100644 index 0000000..a55b5cb --- /dev/null +++ b/.github/workflows/release-dagster-teradata.yml @@ -0,0 +1,14 @@ +name: build-and-release-dagster-teradata + +on: + push: + tags: + - 'dagster_teradata-*.*.*' + +jobs: + build-and-release-dagster-teradata: + uses: ./.github/workflows/template-release.yml + with: + library_name: dagster-teradata + working_directory: ./libraries/dagster-teradata + secrets: inherit diff --git a/libraries/dagster-teradata/Makefile b/libraries/dagster-teradata/Makefile index 6d85a48..003b491 100644 --- a/libraries/dagster-teradata/Makefile +++ b/libraries/dagster-teradata/Makefile @@ -5,7 +5,7 @@ build: uv build test: - uv run pytest + uv run pytest --ignore-glob='*compute_cluster.py' ruff: uv run ruff check --fix . diff --git a/libraries/dagster-teradata/README.md b/libraries/dagster-teradata/README.md index 01c20cb..28ddda7 100644 --- a/libraries/dagster-teradata/README.md +++ b/libraries/dagster-teradata/README.md @@ -1,13 +1,138 @@ -# example-integration +# dagster-teradata -## Test +A dagster module that provides integration with [Teradata Vantage](https://www.teradata.com/). -```sh -make test +## Installation +The `dagster_teradata` module is available as a PyPI package - install with your preferred python +environment manager. + +``` +source .venv/bin/activate +pip install dagster-teradata +``` + +## Example Usage + +This offers seamless integration with Teradata Vantage, facilitating efficient workflows for data processing, management, +and transformation. This module supports a range of scenarios, such as executing queries, managing tables, +and integrating with cloud storage solutions like AWS S3 and Azure Data Lake Storage (ADLS). Additionally, +it enables compute cluster management for Teradata Vantage Cloud Lake. + +```python +import os +import pytest +from dagster import job, op +from dagster_teradata import TeradataResource + +td_resource = TeradataResource( + host=os.getenv("TERADATA_HOST"), + user=os.getenv("TERADATA_USER"), + password=os.getenv("TERADATA_PASSWORD"), + database=os.getenv("TERADATA_DATABASE"), +) + +@pytest.mark.integration +def test_execute_query(tmp_path): + @op(required_resource_keys={"teradata"}) + def example_test_execute_query(context): + result = context.resources.teradata.execute_queries( + ["select order_id from orders_24", "select order_id from orders_25"], True + ) + context.log.info(result) + + @job(resource_defs={"teradata": td_resource}) + def example_job(): + example_test_execute_query() + + example_job.execute_in_process(resources={"teradata": td_resource}) ``` +```python +import os +import pytest +from dagster import job, op +from dagster_teradata import TeradataResource + +td_resource = TeradataResource( + host=os.getenv("TERADATA_HOST"), + user=os.getenv("TERADATA_USER"), + password=os.getenv("TERADATA_PASSWORD"), + database=os.getenv("TERADATA_DATABASE"), +) + +@pytest.mark.integration +def test_drop_table(tmp_path): + @op(required_resource_keys={"teradata"}) + def example_test_drop_table(context): + result = context.resources.teradata.drop_table(["process_tmp1", "process_tmp2"]) + context.log.info(result) -## Build + @job(resource_defs={"teradata": td_resource}) + def example_job(): + example_test_drop_table() + + example_job.execute_in_process(resources={"teradata": td_resource}) +``` + +Here is another example of compute cluster management in Teradata VantageCloud Lake: + +```python +import os + +import pytest +from dagster import job, op +from dagster_teradata import teradata_resource + + +@pytest.mark.integration +def test_create_teradata_compute_cluster(tmp_path): + @op(required_resource_keys={"teradata"}) + def example_create_teradata_compute_cluster(context): + """Args for create_teradata_compute_cluster(): + compute_profile_name: Name of the Compute Profile to manage. + compute_group_name: Name of compute group to which compute profile belongs. + query_strategy: Query strategy to use. Refers to the approach or method used by the + Teradata Optimizer to execute SQL queries efficiently within a Teradata computer cluster. + Valid query_strategy value is either 'STANDARD' or 'ANALYTIC'. Default at database level is STANDARD + compute_map: ComputeMapName of the compute map. The compute_map in a compute cluster profile refers + to the mapping of compute resources to a specific node or set of nodes within the cluster. + compute_attribute: Optional attributes of compute profile. Example compute attribute + MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE') + compute_attribute (str, optional): Additional attributes for compute profile. Defaults to None. + """ + context.resources.teradata.create_teradata_compute_cluster( + "ShippingCG01", + "Shipping", + "STANDARD", + "TD_COMPUTE_MEDIUM", + "MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(1) INITIALLY_SUSPENDED('FALSE')", + ) + + @job(resource_defs={"teradata": teradata_resource}) + def example_job(): + example_create_teradata_compute_cluster() + + example_job.execute_in_process( + run_config={ + "resources": { + "teradata": { + "config": { + "host": os.getenv("TERADATA_HOST"), + "user": os.getenv("TERADATA_USER"), + "password": os.getenv("TERADATA_PASSWORD"), + "database": os.getenv("TERADATA_DATABASE"), + } + } + } + } + ) +``` + +## Development + +The `Makefile` provides the tools required to test and lint your local installation. ```sh -make build +make test +make ruff +make check ``` diff --git a/libraries/dagster-teradata/dagster_teradata/resources.py b/libraries/dagster-teradata/dagster_teradata/resources.py index e3b6adc..55c4e14 100644 --- a/libraries/dagster-teradata/dagster_teradata/resources.py +++ b/libraries/dagster-teradata/dagster_teradata/resources.py @@ -51,7 +51,6 @@ def _connection_args(self) -> Mapping[str, Any]: @public @contextmanager def get_connection(self): - if not self.host: raise ValueError("Host is required but not provided.") if not self.user: @@ -149,7 +148,7 @@ def get_database_info(teradata: TeradataResource): with self.get_connection() as conn: with closing(conn.cursor()) as cursor: - self.log.info("Executing query: " + sql) + # self.log.info("Executing query: " + sql) cursor.execute(sql) if fetch_results: if single_result_row: @@ -188,7 +187,7 @@ def create_fresh_database(teradata: TeradataResource): with self.get_connection() as conn: with closing(conn.cursor()) as cursor: for sql in sql_queries: - self.log.info("Executing query: " + sql) + # self.log.info("Executing query: " + sql) cursor.execute(sql) if fetch_results: results = results.append(cursor.fetchall()) # type: ignore diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_azure_to_teradata.py b/libraries/dagster-teradata/dagster_teradata_tests/test_azure_to_teradata.py index 8563766..6543689 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_azure_to_teradata.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_azure_to_teradata.py @@ -1,16 +1,15 @@ import os -import pytest from dagster import job, op from dagster_azure.adls2 import ADLS2Resource, ADLS2SASToken -from dagster_teradata import TeradataResource, teradata_resource +from dagster_teradata import TeradataResource -# azure_resource = ADLS2Resource( -# storage_account=os.getenv("AZURE_ACCOUNT"), -# credential={"key": os.getenv("AZURE_TOKEN")}, -# ) +azure_resource = ADLS2Resource( + storage_account=os.getenv("AZURE_ACCOUNT", ""), + credential=ADLS2SASToken(token=os.getenv("AZURE_TOKEN", "")), +) -teradata_resource = TeradataResource( +td_resource = TeradataResource( host=os.getenv("TERADATA_HOST"), user=os.getenv("TERADATA_USER"), password=os.getenv("TERADATA_PASSWORD"), @@ -18,16 +17,22 @@ ) -@pytest.mark.integration def test_azure_to_teradata(tmp_path): + @op(required_resource_keys={"teradata"}) + def drop_existing_table(context): + context.resources.teradata.drop_table("people") + @op(required_resource_keys={"teradata", "azure"}) def example_test_azure_to_teradata(context): context.resources.teradata.azure_blob_to_teradata( azure_resource, os.getenv("AZURE_LOCATION"), "people" ) - @job(resource_defs={"teradata": teradata_resource, "azure": azure_resource}) + @job(resource_defs={"teradata": td_resource, "azure": azure_resource}) def example_job(): + drop_existing_table() example_test_azure_to_teradata() - example_job.execute_in_process(resources={"azure": azure_resource, "teradata": teradata_resource}) + example_job.execute_in_process( + resources={"azure": azure_resource, "teradata": td_resource} + ) diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_create_teradata_compute_cluster.py b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_create_teradata_compute_cluster.py index 2d01dcd..1ea043b 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_create_teradata_compute_cluster.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_create_teradata_compute_cluster.py @@ -1,11 +1,9 @@ import os -import pytest from dagster import job, op from dagster_teradata import teradata_resource -@pytest.mark.integration def test_create_teradata_compute_cluster(tmp_path): @op(required_resource_keys={"teradata"}) def example_create_teradata_compute_cluster(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_drop_teradata_compute_cluster.py b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_drop_teradata_compute_cluster.py index 60abb46..981ec5f 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_drop_teradata_compute_cluster.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_drop_teradata_compute_cluster.py @@ -1,11 +1,9 @@ import os -import pytest from dagster import job, op from dagster_teradata import teradata_resource -@pytest.mark.integration def test_example_drop_teradata_compute_cluster(tmp_path): @op(required_resource_keys={"teradata"}) def example_drop_teradata_compute_cluster(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_resume_teradata_compute_cluster.py b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_resume_teradata_compute_cluster.py index 8fa9bbe..f6ccf58 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_resume_teradata_compute_cluster.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_resume_teradata_compute_cluster.py @@ -1,11 +1,9 @@ import os -import pytest from dagster import job, op from dagster_teradata import teradata_resource -@pytest.mark.integration def test_resume_teradata_compute_cluster(tmp_path): @op(required_resource_keys={"teradata"}) def example_resume_teradata_compute_cluster(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_suspend_teradata_compute_cluster.py b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_suspend_teradata_compute_cluster.py index 0ff0e21..01e1733 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_suspend_teradata_compute_cluster.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_compute_cluster_manager/test_suspend_teradata_compute_cluster.py @@ -1,11 +1,9 @@ import os -import pytest from dagster import job, op from dagster_teradata import teradata_resource -@pytest.mark.integration def test_suspend_teradata_compute_cluster(tmp_path): @op(required_resource_keys={"teradata"}) def example_suspend_teradata_compute_cluster(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_drop_database.py b/libraries/dagster-teradata/dagster_teradata_tests/test_drop_database.py index bc1cf2f..30b6e9c 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_drop_database.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_drop_database.py @@ -1,6 +1,5 @@ import os -import pytest from dagster import job, op from dagster_teradata import TeradataResource @@ -12,7 +11,6 @@ ) -@pytest.mark.integration def test_drop_database(tmp_path): @op(required_resource_keys={"teradata"}) def example_test_drop_database(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_drop_table.py b/libraries/dagster-teradata/dagster_teradata_tests/test_drop_table.py index 3bbb423..166c135 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_drop_table.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_drop_table.py @@ -1,6 +1,5 @@ import os -import pytest from dagster import job, op from dagster_teradata import TeradataResource @@ -12,7 +11,6 @@ ) -@pytest.mark.integration def test_drop_table(tmp_path): @op(required_resource_keys={"teradata"}) def example_test_drop_table(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_execute_queries.py b/libraries/dagster-teradata/dagster_teradata_tests/test_execute_queries.py index b2aa3ab..81aef41 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_execute_queries.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_execute_queries.py @@ -1,6 +1,5 @@ import os -import pytest from dagster import job, op from dagster_teradata import TeradataResource @@ -12,7 +11,6 @@ ) -@pytest.mark.integration def test_execute_query(tmp_path): @op(required_resource_keys={"teradata"}) def example_test_execute_query(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_execute_query.py b/libraries/dagster-teradata/dagster_teradata_tests/test_execute_query.py index 5e49d92..e2e9764 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_execute_query.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_execute_query.py @@ -1,6 +1,5 @@ import os -import pytest from dagster import job, op from dagster_teradata import TeradataResource @@ -12,7 +11,6 @@ ) -@pytest.mark.integration def test_execute_query(tmp_path): @op(required_resource_keys={"teradata"}) def example_test_execute_query(context): diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_resources.py b/libraries/dagster-teradata/dagster_teradata_tests/test_resources.py index 24bbdd0..950debc 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_resources.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_resources.py @@ -1,14 +1,12 @@ import os import uuid -import pytest import teradatasql from dagster import asset, materialize from dagster._time import get_current_timestamp from dagster_teradata import TeradataResource, fetch_last_updated_timestamps -@pytest.mark.integration def test_resource(tmp_path): df = ["a"] @@ -57,7 +55,6 @@ def read_table(teradata: TeradataResource, insert_rows): ) -@pytest.mark.integration def test_resources_teradata_connection(): with TeradataResource( host=os.getenv("TERADATA_HOST"), @@ -68,7 +65,7 @@ def test_resources_teradata_connection(): # Teradata table names are expected to be capitalized. table_name = f"test_table_{str(uuid.uuid4()).replace('-', '_')}".lower() try: - start_time = get_current_timestamp() + start_time = round(get_current_timestamp()) conn.cursor().execute(f"create table {table_name} (foo varchar(10))") # Insert one row conn.cursor().execute(f"insert into {table_name} values ('bar')") @@ -81,9 +78,9 @@ def test_resources_teradata_connection(): ], # Teradata table names are expected uppercase. Test that lowercase also works. )[table_name].timestamp() - # end_time = get_current_timestamp() + end_time = round(get_current_timestamp()) - assert freshness_for_table > start_time + assert end_time > freshness_for_table > start_time finally: try: conn.cursor().execute(f"drop table {table_name}") diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata_S3Resource.py b/libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata.py similarity index 85% rename from libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata_S3Resource.py rename to libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata.py index 89ef093..380f465 100644 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata_S3Resource.py +++ b/libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata.py @@ -1,6 +1,5 @@ import os -import pytest from dagster import job, op from dagster_aws.s3 import S3Resource from dagster_teradata import TeradataResource @@ -19,8 +18,11 @@ ) -@pytest.mark.integration def test_s3_to_teradata(tmp_path): + @op(required_resource_keys={"teradata"}) + def drop_existing_table(context): + context.resources.teradata.drop_table("people") + @op(required_resource_keys={"teradata", "s3"}) def example_test_s3_to_teradata(context): context.resources.teradata.s3_to_teradata( @@ -29,6 +31,7 @@ def example_test_s3_to_teradata(context): @job(resource_defs={"teradata": td_resource, "s3": s3_resource}) def example_job(): + drop_existing_table() example_test_s3_to_teradata() example_job.execute_in_process( diff --git a/libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata_session.py b/libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata_session.py deleted file mode 100644 index 6a66003..0000000 --- a/libraries/dagster-teradata/dagster_teradata_tests/test_s3_to_teradata_session.py +++ /dev/null @@ -1,35 +0,0 @@ -import os - -import boto3 -import pytest -from dagster import job, op -from dagster_teradata import teradata_resource - - -@pytest.mark.integration -def test_s3_to_teradata(tmp_path): - @op(required_resource_keys={"teradata"}) - def example_test_s3_to_teradata(context): - session = boto3.Session() - context.resources.teradata.s3_to_teradata( - session, os.getenv("AWS_S3_LOCATION"), "people" - ) - - @job(resource_defs={"teradata": teradata_resource}) - def example_job(): - example_test_s3_to_teradata() - - example_job.execute_in_process( - run_config={ - "resources": { - "teradata": { - "config": { - "host": os.getenv("TERADATA_HOST"), - "user": os.getenv("TERADATA_USER"), - "password": os.getenv("TERADATA_PASSWORD"), - "database": os.getenv("TERADATA_DATABASE"), - } - } - } - } - )