Skip to content

Commit

Permalink
Updated test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
tallamohan committed Jan 20, 2025
1 parent 8ba91d1 commit 50bbf70
Show file tree
Hide file tree
Showing 17 changed files with 183 additions and 79 deletions.
12 changes: 12 additions & 0 deletions .github/workflows/quality-check-dagster-teradata.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
name: quality-check-dagster-teradata
on:
pull_request:
types: [opened, synchronize, reopened]
paths:
- 'libraries/dagster-teradata/**'

jobs:
check:
uses: ./.github/workflows/template-quality-check.yml
with:
working_directory: ./libraries/dagster-teradata
14 changes: 14 additions & 0 deletions .github/workflows/release-dagster-teradata.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
name: build-and-release-dagster-teradata

on:
push:
tags:
- 'dagster_teradata-*.*.*'

jobs:
build-and-release-dagster-teradata:
uses: ./.github/workflows/template-release.yml
with:
library_name: dagster-teradata
working_directory: ./libraries/dagster-teradata
secrets: inherit
2 changes: 1 addition & 1 deletion libraries/dagster-teradata/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ build:
uv build

test:
uv run pytest
uv run pytest --ignore-glob='*compute_cluster.py'

ruff:
uv run ruff check --fix .
Expand Down
137 changes: 131 additions & 6 deletions libraries/dagster-teradata/README.md
Original file line number Diff line number Diff line change
@@ -1,13 +1,138 @@
# example-integration
# dagster-teradata

## Test
A dagster module that provides integration with [Teradata Vantage](https://www.teradata.com/).

```sh
make test
## Installation
The `dagster_teradata` module is available as a PyPI package - install with your preferred python
environment manager.

```
source .venv/bin/activate
pip install dagster-teradata
```

## Example Usage

This offers seamless integration with Teradata Vantage, facilitating efficient workflows for data processing, management,
and transformation. This module supports a range of scenarios, such as executing queries, managing tables,
and integrating with cloud storage solutions like AWS S3 and Azure Data Lake Storage (ADLS). Additionally,
it enables compute cluster management for Teradata Vantage Cloud Lake.

```python
import os
import pytest
from dagster import job, op
from dagster_teradata import TeradataResource

td_resource = TeradataResource(
host=os.getenv("TERADATA_HOST"),
user=os.getenv("TERADATA_USER"),
password=os.getenv("TERADATA_PASSWORD"),
database=os.getenv("TERADATA_DATABASE"),
)

@pytest.mark.integration
def test_execute_query(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_execute_query(context):
result = context.resources.teradata.execute_queries(
["select order_id from orders_24", "select order_id from orders_25"], True
)
context.log.info(result)

@job(resource_defs={"teradata": td_resource})
def example_job():
example_test_execute_query()

example_job.execute_in_process(resources={"teradata": td_resource})
```
```python
import os
import pytest
from dagster import job, op
from dagster_teradata import TeradataResource

td_resource = TeradataResource(
host=os.getenv("TERADATA_HOST"),
user=os.getenv("TERADATA_USER"),
password=os.getenv("TERADATA_PASSWORD"),
database=os.getenv("TERADATA_DATABASE"),
)

@pytest.mark.integration
def test_drop_table(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_drop_table(context):
result = context.resources.teradata.drop_table(["process_tmp1", "process_tmp2"])
context.log.info(result)

## Build
@job(resource_defs={"teradata": td_resource})
def example_job():
example_test_drop_table()

example_job.execute_in_process(resources={"teradata": td_resource})
```

Here is another example of compute cluster management in Teradata VantageCloud Lake:

```python
import os

import pytest
from dagster import job, op
from dagster_teradata import teradata_resource


@pytest.mark.integration
def test_create_teradata_compute_cluster(tmp_path):
@op(required_resource_keys={"teradata"})
def example_create_teradata_compute_cluster(context):
"""Args for create_teradata_compute_cluster():
compute_profile_name: Name of the Compute Profile to manage.
compute_group_name: Name of compute group to which compute profile belongs.
query_strategy: Query strategy to use. Refers to the approach or method used by the
Teradata Optimizer to execute SQL queries efficiently within a Teradata computer cluster.
Valid query_strategy value is either 'STANDARD' or 'ANALYTIC'. Default at database level is STANDARD
compute_map: ComputeMapName of the compute map. The compute_map in a compute cluster profile refers
to the mapping of compute resources to a specific node or set of nodes within the cluster.
compute_attribute: Optional attributes of compute profile. Example compute attribute
MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(5) INITIALLY_SUSPENDED('FALSE')
compute_attribute (str, optional): Additional attributes for compute profile. Defaults to None.
"""
context.resources.teradata.create_teradata_compute_cluster(
"ShippingCG01",
"Shipping",
"STANDARD",
"TD_COMPUTE_MEDIUM",
"MIN_COMPUTE_COUNT(1) MAX_COMPUTE_COUNT(1) INITIALLY_SUSPENDED('FALSE')",
)

@job(resource_defs={"teradata": teradata_resource})
def example_job():
example_create_teradata_compute_cluster()

example_job.execute_in_process(
run_config={
"resources": {
"teradata": {
"config": {
"host": os.getenv("TERADATA_HOST"),
"user": os.getenv("TERADATA_USER"),
"password": os.getenv("TERADATA_PASSWORD"),
"database": os.getenv("TERADATA_DATABASE"),
}
}
}
}
)
```

## Development

The `Makefile` provides the tools required to test and lint your local installation.

```sh
make build
make test
make ruff
make check
```
5 changes: 2 additions & 3 deletions libraries/dagster-teradata/dagster_teradata/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ def _connection_args(self) -> Mapping[str, Any]:
@public
@contextmanager
def get_connection(self):

if not self.host:
raise ValueError("Host is required but not provided.")
if not self.user:
Expand Down Expand Up @@ -149,7 +148,7 @@ def get_database_info(teradata: TeradataResource):

with self.get_connection() as conn:
with closing(conn.cursor()) as cursor:
self.log.info("Executing query: " + sql)
# self.log.info("Executing query: " + sql)
cursor.execute(sql)
if fetch_results:
if single_result_row:
Expand Down Expand Up @@ -188,7 +187,7 @@ def create_fresh_database(teradata: TeradataResource):
with self.get_connection() as conn:
with closing(conn.cursor()) as cursor:
for sql in sql_queries:
self.log.info("Executing query: " + sql)
# self.log.info("Executing query: " + sql)
cursor.execute(sql)
if fetch_results:
results = results.append(cursor.fetchall()) # type: ignore
Expand Down
Original file line number Diff line number Diff line change
@@ -1,33 +1,38 @@
import os

import pytest
from dagster import job, op
from dagster_azure.adls2 import ADLS2Resource, ADLS2SASToken
from dagster_teradata import TeradataResource, teradata_resource
from dagster_teradata import TeradataResource

# azure_resource = ADLS2Resource(
# storage_account=os.getenv("AZURE_ACCOUNT"),
# credential={"key": os.getenv("AZURE_TOKEN")},
# )
azure_resource = ADLS2Resource(
storage_account=os.getenv("AZURE_ACCOUNT", ""),
credential=ADLS2SASToken(token=os.getenv("AZURE_TOKEN", "")),
)

teradata_resource = TeradataResource(
td_resource = TeradataResource(
host=os.getenv("TERADATA_HOST"),
user=os.getenv("TERADATA_USER"),
password=os.getenv("TERADATA_PASSWORD"),
database=os.getenv("TERADATA_DATABASE"),
)


@pytest.mark.integration
def test_azure_to_teradata(tmp_path):
@op(required_resource_keys={"teradata"})
def drop_existing_table(context):
context.resources.teradata.drop_table("people")

@op(required_resource_keys={"teradata", "azure"})
def example_test_azure_to_teradata(context):
context.resources.teradata.azure_blob_to_teradata(
azure_resource, os.getenv("AZURE_LOCATION"), "people"
)

@job(resource_defs={"teradata": teradata_resource, "azure": azure_resource})
@job(resource_defs={"teradata": td_resource, "azure": azure_resource})
def example_job():
drop_existing_table()
example_test_azure_to_teradata()

example_job.execute_in_process(resources={"azure": azure_resource, "teradata": teradata_resource})
example_job.execute_in_process(
resources={"azure": azure_resource, "teradata": td_resource}
)
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import teradata_resource


@pytest.mark.integration
def test_create_teradata_compute_cluster(tmp_path):
@op(required_resource_keys={"teradata"})
def example_create_teradata_compute_cluster(context):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import teradata_resource


@pytest.mark.integration
def test_example_drop_teradata_compute_cluster(tmp_path):
@op(required_resource_keys={"teradata"})
def example_drop_teradata_compute_cluster(context):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import teradata_resource


@pytest.mark.integration
def test_resume_teradata_compute_cluster(tmp_path):
@op(required_resource_keys={"teradata"})
def example_resume_teradata_compute_cluster(context):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import teradata_resource


@pytest.mark.integration
def test_suspend_teradata_compute_cluster(tmp_path):
@op(required_resource_keys={"teradata"})
def example_suspend_teradata_compute_cluster(context):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import TeradataResource

Expand All @@ -12,7 +11,6 @@
)


@pytest.mark.integration
def test_drop_database(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_drop_database(context):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import TeradataResource

Expand All @@ -12,7 +11,6 @@
)


@pytest.mark.integration
def test_drop_table(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_drop_table(context):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import TeradataResource

Expand All @@ -12,7 +11,6 @@
)


@pytest.mark.integration
def test_execute_query(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_execute_query(context):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import os

import pytest
from dagster import job, op
from dagster_teradata import TeradataResource

Expand All @@ -12,7 +11,6 @@
)


@pytest.mark.integration
def test_execute_query(tmp_path):
@op(required_resource_keys={"teradata"})
def example_test_execute_query(context):
Expand Down
Loading

0 comments on commit 50bbf70

Please sign in to comment.