diff --git a/.github/workflows/dagster-cloud-deploy.yml b/.github/workflows/dagster-cloud-deploy.yml index 2bb0ea5..d56286c 100644 --- a/.github/workflows/dagster-cloud-deploy.yml +++ b/.github/workflows/dagster-cloud-deploy.yml @@ -113,27 +113,27 @@ jobs: # username: _json_key # password: ${{ secrets.GCR_JSON_KEY }} - # Build "example_location" location. + # Build "quickstart_etl" location. # For each code location, the "build-push-action" builds the docker # image and a "set-build-output" command records the image tag for each code location. # To re-use the same docker image across multiple code locations, build the docker image once # and specify the same tag in multiple "set-build-output" commands. To use a different docker # image for each code location, use multiple "build-push-actions" with a location specific # tag. - - name: Build and upload Docker image for "example_location" + - name: Build and upload Docker image for "quickstart_etl" if: steps.prerun.outputs.result != 'skip' uses: docker/build-push-action@v4 with: context: . push: true - tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-example-location + tags: ${{ env.IMAGE_REGISTRY }}:${{ env.IMAGE_TAG }}-quickstart-etl - - name: Update build session with image tag for example_location + - name: Update build session with image tag for quickstart_etl id: ci-set-build-output-example-location if: steps.prerun.outputs.result != 'skip' uses: dagster-io/dagster-cloud-action/actions/utils/dagster-cloud-cli@v0.1 with: - command: "ci set-build-output --location-name=data-eng-pipeline --image-tag=$IMAGE_TAG-example-location" + command: "ci set-build-output --location-name=data-eng-pipeline --image-tag=$IMAGE_TAG-quickstart-etl" # Deploy all code locations in this build session to Dagster Cloud - name: Deploy to Dagster Cloud diff --git a/README.md b/README.md index d1b303c..24e980d 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,7 @@ Click the `Use this Template` button and provide details for your new repo. ## Step 2. Add your Docker registry to `dagster_cloud.yaml` -The [`dagster_cloud.yaml`](./dagster_cloud.yaml) file defines the configuration for building and deploying your code locations. For the `example_location`, specify the Docker registry in the `registry:` key: +The [`dagster_cloud.yaml`](./dagster_cloud.yaml) file defines the configuration for building and deploying your code locations. For the `quickstart_etl`, specify the Docker registry in the `registry:` key: https://github.com/dagster-io/dagster-cloud-hybrid-quickstart/blob/669cc3acac00a070b38ec50e0c158b0c3d8b6996/dagster_cloud.yaml#L7 @@ -62,7 +62,7 @@ Set up secrets on your newly created repository by navigating to the `Settings` ## Step 5. Verify builds are successful -At this point, the workflow run should complete successfully and you should see the `example_location` in https://dagster.cloud. If builds are failing, ensure that your +At this point, the workflow run should complete successfully and you should see the `quickstart_etl` in https://dagster.cloud. If builds are failing, ensure that your secrets are properly set up the workflow properly sets up Docker regsitry access. Screen Shot 2022-08-08 at 9 07 25 PM @@ -87,7 +87,7 @@ https://github.com/dagster-io/dagster-cloud-hybrid-quickstart/blob/9f63f62b1a7ca ## Customize the Docker build process -A standard `Dockerfile` is included in this project and used to build the `example_location`. This file is used by the `build-push-action`: +A standard `Dockerfile` is included in this project and used to build the `quickstart_etl`. This file is used by the `build-push-action`: https://github.com/dagster-io/dagster-cloud-hybrid-quickstart/blob/fa0a0d3409fda4c342da41c970f568d32996747f/.github/workflows/dagster-cloud-deploy.yml#L123-L129 @@ -105,5 +105,5 @@ The `ci-init` step accepts a `location_names` input string containing a JSON lis project_dir: ${{ env.DAGSTER_PROJECT_DIR }} dagster_cloud_yaml_path: ${{ env.DAGSTER_CLOUD_YAML_PATH }} deployment: 'prod' - location_names: '["example_location1", "location2"]' # only deploy these two locations + location_names: '["quickstart_etl1", "location2"]' # only deploy these two locations ``` diff --git a/dagster_cloud.yaml b/dagster_cloud.yaml index eafca40..5804d33 100644 --- a/dagster_cloud.yaml +++ b/dagster_cloud.yaml @@ -1,8 +1,4 @@ locations: - - location_name: example_location + - location_name: quickstart_etl code_source: - package_name: my_dagster_project - build: - directory: ./ - registry: .dkr.ecr.us-west-2.amazonaws.com/branch-deployments-gh-action-test - + package_name: quickstart_etl diff --git a/my_dagster_project/__init__.py b/my_dagster_project/__init__.py deleted file mode 100644 index 54a0446..0000000 --- a/my_dagster_project/__init__.py +++ /dev/null @@ -1 +0,0 @@ -from .repository import my_dagster_project diff --git a/my_dagster_project/assets/__init__.py b/my_dagster_project/assets/__init__.py deleted file mode 100644 index 3643d49..0000000 --- a/my_dagster_project/assets/__init__.py +++ /dev/null @@ -1,17 +0,0 @@ -import csv -import requests -from dagster import asset - - -@asset -def cereals(): - response = requests.get("https://docs.dagster.io/assets/cereal.csv") - lines = response.text.split("\n") - cereal_rows = [row for row in csv.DictReader(lines)] - - return cereal_rows - - -# NOTE: To take advantage of Dagster's incremental re-execution functionality (e.g. retry from failure), -# you'll need to set up an IO manager that can move the data across runs. -# For more details, visit https://docs.dagster.io/concepts/io-management/io-managers#applying-io-managers-to-assets diff --git a/my_dagster_project/repository.py b/my_dagster_project/repository.py deleted file mode 100644 index 77036b4..0000000 --- a/my_dagster_project/repository.py +++ /dev/null @@ -1,11 +0,0 @@ -from dagster import load_assets_from_package_module, repository, define_asset_job - -from my_dagster_project import assets - - -@repository -def my_dagster_project(): - return [ - load_assets_from_package_module(assets), - define_asset_job(name="all_assets_job"), - ] diff --git a/my_dagster_project_tests/test_assets.py b/my_dagster_project_tests/test_assets.py deleted file mode 100644 index d0fa87e..0000000 --- a/my_dagster_project_tests/test_assets.py +++ /dev/null @@ -1,11 +0,0 @@ -from my_dagster_project.assets import nabisco_cereals - - -def test_nabisco_cereals(): - cereals = [ - {"name": "cereal1", "mfr": "N"}, - {"name": "cereal2", "mfr": "K"}, - ] - result = nabisco_cereals(cereals) - assert len(result) == 1 - assert result == [{"name": "cereal1", "mfr": "N"}] diff --git a/pyproject.toml b/pyproject.toml index fed528d..db34245 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,3 +1,6 @@ [build-system] requires = ["setuptools"] build-backend = "setuptools.build_meta" + +[tool.dagster] +module_name = "quickstart_etl" \ No newline at end of file diff --git a/quickstart_etl/__init__.py b/quickstart_etl/__init__.py new file mode 100644 index 0000000..3070b3e --- /dev/null +++ b/quickstart_etl/__init__.py @@ -0,0 +1,16 @@ +from dagster import ( + Definitions, + ScheduleDefinition, + define_asset_job, + load_assets_from_package_module, +) + +from . import assets + +daily_refresh_schedule = ScheduleDefinition( + job=define_asset_job(name="all_assets_job"), cron_schedule="0 0 * * *" +) + +defs = Definitions( + assets=load_assets_from_package_module(assets), schedules=[daily_refresh_schedule] +) diff --git a/quickstart_etl/assets/__init__.py b/quickstart_etl/assets/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/quickstart_etl/assets/hackernews.py b/quickstart_etl/assets/hackernews.py new file mode 100644 index 0000000..8abe026 --- /dev/null +++ b/quickstart_etl/assets/hackernews.py @@ -0,0 +1,83 @@ +import base64 +from io import BytesIO +from typing import List + +import matplotlib.pyplot as plt +import pandas as pd +import requests +from dagster import MetadataValue, OpExecutionContext, asset +from wordcloud import STOPWORDS, WordCloud + + +@asset(group_name="hackernews", compute_kind="HackerNews API") +def hackernews_topstory_ids() -> List[int]: + """Get up to 500 top stories from the HackerNews topstories endpoint. + + API Docs: https://github.com/HackerNews/API#new-top-and-best-stories + """ + newstories_url = "https://hacker-news.firebaseio.com/v0/topstories.json" + top_500_newstories = requests.get(newstories_url).json() + return top_500_newstories + + +@asset(group_name="hackernews", compute_kind="HackerNews API") +def hackernews_topstories( + context: OpExecutionContext, hackernews_topstory_ids: List[int] +) -> pd.DataFrame: + """Get items based on story ids from the HackerNews items endpoint. It may take 1-2 minutes to fetch all 500 items. + + API Docs: https://github.com/HackerNews/API#items + """ + results = [] + for item_id in hackernews_topstory_ids: + item = requests.get(f"https://hacker-news.firebaseio.com/v0/item/{item_id}.json").json() + results.append(item) + if len(results) % 20 == 0: + context.log.info(f"Got {len(results)} items so far.") + + df = pd.DataFrame(results) + + # Dagster supports attaching arbitrary metadata to asset materializations. This metadata will be + # shown in the run logs and also be displayed on the "Activity" tab of the "Asset Details" page in the UI. + # This metadata would be useful for monitoring and maintaining the asset as you iterate. + # Read more about in asset metadata in https://docs.dagster.io/concepts/assets/software-defined-assets#recording-materialization-metadata + context.add_output_metadata( + { + "num_records": len(df), + "preview": MetadataValue.md(df.head().to_markdown()), + } + ) + return df + + +@asset(group_name="hackernews", compute_kind="Plot") +def hackernews_topstories_word_cloud( + context: OpExecutionContext, hackernews_topstories: pd.DataFrame +) -> bytes: + """Exploratory analysis: Generate a word cloud from the current top 500 HackerNews top stories. + Embed the plot into a Markdown metadata for quick view. + + Read more about how to create word clouds in http://amueller.github.io/word_cloud/. + """ + stopwords = set(STOPWORDS) + stopwords.update(["Ask", "Show", "HN"]) + titles_text = " ".join([str(item) for item in hackernews_topstories["title"]]) + titles_cloud = WordCloud(stopwords=stopwords, background_color="white").generate(titles_text) + + # Generate the word cloud image + plt.figure(figsize=(8, 8), facecolor=None) + plt.imshow(titles_cloud, interpolation="bilinear") + plt.axis("off") + plt.tight_layout(pad=0) + + # Save the image to a buffer and embed the image into Markdown content for quick view + buffer = BytesIO() + plt.savefig(buffer, format="png") + image_data = base64.b64encode(buffer.getvalue()) + md_content = f"![img](data:image/png;base64,{image_data.decode()})" + + # Attach the Markdown content as metadata to the asset + # Read about more metadata types in https://docs.dagster.io/_apidocs/ops#metadata-types + context.add_output_metadata({"plot": MetadataValue.md(md_content)}) + + return image_data diff --git a/my_dagster_project_tests/__init__.py b/quickstart_etl_tests/__init__.py similarity index 100% rename from my_dagster_project_tests/__init__.py rename to quickstart_etl_tests/__init__.py diff --git a/quickstart_etl_tests/test_assets.py b/quickstart_etl_tests/test_assets.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/quickstart_etl_tests/test_assets.py @@ -0,0 +1 @@ + diff --git a/setup.cfg b/setup.cfg index c1f9d7f..c39bb92 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,2 +1,2 @@ [metadata] -name = my_dagster_project +name = quickstart_etl diff --git a/setup.py b/setup.py index def81eb..9b0d0cc 100644 --- a/setup.py +++ b/setup.py @@ -1,10 +1,17 @@ from setuptools import find_packages, setup -if __name__ == "__main__": - setup( - name="my_dagster_project", - packages=find_packages(exclude=["my_dagster_project_tests"]), - install_requires=[ - "dagster", - ], - ) +setup( + name="quickstart_etl", + packages=find_packages(exclude=["quickstart_etl_tests"]), + install_requires=[ + "dagster", + "dagster-cloud", + "boto3", + "pandas", + "matplotlib", + "textblob", + "tweepy", + "wordcloud", + ], + extras_require={"dev": ["dagit", "pytest"]}, +)