diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml deleted file mode 100644 index 2cfb67e1..00000000 --- a/.github/ISSUE_TEMPLATE/bug_report.yml +++ /dev/null @@ -1,84 +0,0 @@ -name: 🐞 Bug -description: Report a bug or an issue you've found with dbt-athena -title: "[Bug] " -labels: ["bug", "triage"] -body: - - type: markdown - attributes: - value: | - Thanks for taking the time to fill out this bug report! - - type: checkboxes - attributes: - label: Is this a new bug in dbt-athena? - description: > - In other words, is this an error, flaw, failure or fault in our software? - - If this is a bug that broke existing functionality that used to work, please open a regression issue. - If this is a bug in the dbt-core logic, please open an issue in the dbt-core repository. - If this is a bug experienced while using dbt Cloud, please report to [support](mailto:support@getdbt.com). - If this is a request for help or troubleshooting code in your own dbt project, please join our [dbt Community Slack](https://www.getdbt.com/community/join-the-community/) or open a [Discussion question](https://github.com/dbt-labs/docs.getdbt.com/discussions). - - Please search to see if an issue already exists for the bug you encountered. - options: - - label: I believe this is a new bug in dbt-athena - required: true - - label: I have searched the existing issues, and I could not find an existing issue for this bug - required: true - - type: textarea - attributes: - label: Current Behavior - description: A concise description of what you're experiencing. - validations: - required: true - - type: textarea - attributes: - label: Expected Behavior - description: A concise description of what you expected to happen. - validations: - required: true - - type: textarea - attributes: - label: Steps To Reproduce - description: Steps to reproduce the behavior. - placeholder: | - 1. In this environment... - 2. With this config... - 3. Run '...' - 4. See error... - validations: - required: true - - type: textarea - id: logs - attributes: - label: Relevant log output - description: | - If applicable, log output to help explain your problem. - render: shell - validations: - required: false - - type: textarea - attributes: - label: Environment - description: | - examples: - - **OS**: Ubuntu 20.04 - - **Python**: 3.9.12 (`python3 --version`) - - **dbt-core**: 1.1.1 (`dbt --version`) - - **dbt-athena**: 1.1.0 (`dbt --version`) - value: | - - OS: - - Python: - - dbt-core: - - dbt-athena: - render: markdown - validations: - required: false - - type: textarea - attributes: - label: Additional Context - description: | - Links? References? Anything that will give us more context about the issue you are encountering! - - Tip: You can attach images or log files by clicking this area to highlight it and then dragging files in. - validations: - required: false diff --git a/.github/ISSUE_TEMPLATE/config.yml b/.github/ISSUE_TEMPLATE/config.yml index db00b654..1f34d4a7 100644 --- a/.github/ISSUE_TEMPLATE/config.yml +++ b/.github/ISSUE_TEMPLATE/config.yml @@ -7,8 +7,11 @@ contact_links: url: mailto:support@getdbt.com about: Are you using dbt Cloud? Contact our support team for help! - name: Participate in Discussions - url: https://github.com/dbt-labs/dbt-snowflake/discussions - about: Do you have a Big Idea for dbt-snowflake? Read open discussions, or start a new one + url: https://github.com/dbt-labs/dbt-adapters/discussions + about: Do you have a Big Idea for dbt-athena? Read open discussions, or start a new one + - name: Create an issue for dbt-athena + url: https://github.com/dbt-labs/dbt-adapters/issues/new/choose + about: Report a bug or request a feature for dbt-athena - name: Create an issue for dbt-core url: https://github.com/dbt-labs/dbt-core/issues/new/choose about: Report a bug or request a feature for dbt-core diff --git a/.github/ISSUE_TEMPLATE/feature-request.yml b/.github/ISSUE_TEMPLATE/feature-request.yml deleted file mode 100644 index 2c1d8c4f..00000000 --- a/.github/ISSUE_TEMPLATE/feature-request.yml +++ /dev/null @@ -1,55 +0,0 @@ -name: ✨ Feature -description: Propose a straightforward extension of dbt functionality -title: "[Feature] <title>" -labels: ["feature", "triage"] -body: -- type: markdown - attributes: - value: | - Thanks for taking the time to fill out this feature request! -- type: checkboxes - attributes: - label: Is this your first time submitting a feature request? - description: > - We want to make sure that features are distinct and discoverable, - so that other members of the community can find them and offer their thoughts. - - Issues are the right place to request straightforward extensions of existing dbt functionality. - For "big ideas" about future capabilities of dbt, we ask that you open a - options: - - label: I have searched the existing issues, and I could not find an existing issue for this feature - required: true -- type: textarea - attributes: - label: Describe the feature - description: A clear and concise description of what you want to happen. - validations: - required: true -- type: textarea - attributes: - label: Describe alternatives you've considered - description: | - A clear and concise description of any alternative solutions or features you've considered. - validations: - required: false -- type: textarea - attributes: - label: Who will this benefit? - description: | - What kind of use case will this feature be useful for? - Please be specific and provide examples, this will help us prioritize properly. - validations: - required: false -- type: input - attributes: - label: Are you interested in contributing this feature? - description: Let us know if you want to write some code, and how we can help. - validations: - required: false -- type: textarea - attributes: - label: Anything else? - description: | - Links? References? Anything that will give us more context about the feature you are suggesting! - validations: - required: false diff --git a/.github/ISSUE_TEMPLATE/regression-report.yml b/.github/ISSUE_TEMPLATE/regression-report.yml deleted file mode 100644 index 5c10d2b8..00000000 --- a/.github/ISSUE_TEMPLATE/regression-report.yml +++ /dev/null @@ -1,82 +0,0 @@ -name: ☣️ Regression -description: Report a regression you've observed in a newer version of dbt-athena -title: "[Regression] <title>" -labels: ["bug", "regression", "triage"] -body: - - type: markdown - attributes: - value: | - Thanks for taking the time to fill out this regression report! - - type: checkboxes - attributes: - label: Is this a regression in a recent version of dbt-athena? - description: > - A regression is when documented functionality works as expected in an older version of dbt-athena, - and no longer works after upgrading to a newer version of dbt-athena - options: - - label: I believe this is a regression in dbt-athena functionality - required: true - - label: I have searched the existing issues, and I could not find an existing issue for this regression - required: true - - type: textarea - attributes: - label: Current Behavior - description: A concise description of what you're experiencing. - validations: - required: true - - type: textarea - attributes: - label: Expected/Previous Behavior - description: A concise description of what you expected to happen. - validations: - required: true - - type: textarea - attributes: - label: Steps To Reproduce - description: Steps to reproduce the behavior. - placeholder: | - 1. In this environment... - 2. With this config... - 3. Run '...' - 4. See error... - validations: - required: true - - type: textarea - id: logs - attributes: - label: Relevant log output - description: | - If applicable, log output to help explain your problem. - render: shell - validations: - required: false - - type: textarea - attributes: - label: Environment - description: | - examples: - - **OS**: Ubuntu 20.04 - - **Python**: 3.9.12 (`python3 --version`) - - **dbt-core (working version)**: 1.1.1 (`dbt --version`) - - **dbt-athena (working version)**: 1.1.0 (`dbt --version`) - - **dbt-core (regression version)**: 1.2.0 (`dbt --version`) - - **dbt-athena (regression version)**: 1.2.0 (`dbt --version`) - value: | - - OS: - - Python: - - dbt-core (working version): - - dbt-athena (working version): - - dbt-core (regression version): - - dbt-athena (regression version): - render: markdown - validations: - required: true - - type: textarea - attributes: - label: Additional Context - description: | - Links? References? Anything that will give us more context about the issue you are encountering! - - Tip: You can attach images or log files by clicking this area to highlight it and then dragging files in. - validations: - required: false diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md deleted file mode 100644 index 8b0a45f1..00000000 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ /dev/null @@ -1,14 +0,0 @@ -# Description - -<!--- Add a little description on what you plan to tackle with this PR --> -<!--- Add resolve #issue-number in case you resolve an open issue --> - -## Models used to test - Optional -<!--- Add here the models that you use to test the changes --> - -## Checklist - -- [ ] You followed [contributing section](https://github.com/dbt-athena/dbt-athena#contributing) -- [ ] You kept your Pull Request small and focused on a single feature or bug fix. -- [ ] You added unit testing when necessary -- [ ] You added functional testing when necessary diff --git a/.github/dependabot.yml b/.github/dependabot.yml deleted file mode 100644 index 6a7695c0..00000000 --- a/.github/dependabot.yml +++ /dev/null @@ -1,6 +0,0 @@ -version: 2 -updates: - - package-ecosystem: "pip" - directory: "/" - schedule: - interval: "weekly" diff --git a/.github/semantic.yml b/.github/semantic.yml deleted file mode 100644 index 2fc06025..00000000 --- a/.github/semantic.yml +++ /dev/null @@ -1,4 +0,0 @@ -# Semantic Commit bot: https://github.com/Ezard/semantic-prs - -# Always validate the PR title, and ignore the commits -titleOnly: true diff --git a/.github/workflows/_integration-tests.yml b/.github/workflows/_integration-tests.yml deleted file mode 100644 index 8adf0b27..00000000 --- a/.github/workflows/_integration-tests.yml +++ /dev/null @@ -1,81 +0,0 @@ -name: "Integration tests" - -on: - workflow_call: - inputs: - package: - description: "Choose the package to test" - type: string - default: "dbt-athena" - branch: - description: "Choose the branch to test" - type: string - default: "main" - repository: - description: "Choose the repository to test, when using a fork" - type: string - default: "dbt-labs/dbt-athena" - os: - description: "Choose the OS to test against" - type: string - default: "ubuntu-22.04" - python-version: - description: "Choose the Python version to test against" - type: string - default: "3.9" - workflow_dispatch: - inputs: - package: - description: "Choose the package to test" - type: choice - options: ["dbt-athena", "dbt-athena-community"] - branch: - description: "Choose the branch to test" - type: string - default: "main" - repository: - description: "Choose the repository to test, when using a fork" - type: string - default: "dbt-labs/dbt-athena" - os: - description: "Choose the OS to test against" - type: string - default: "ubuntu-22.04" - python-version: - description: "Choose the Python version to test against" - type: choice - options: ["3.9", "3.10", "3.11", "3.12"] - -permissions: - id-token: write - contents: read - -env: - DBT_TEST_ATHENA_S3_STAGING_DIR: ${{ vars.DBT_TEST_ATHENA_S3_BUCKET }}/staging/ - DBT_TEST_ATHENA_S3_TMP_TABLE_DIR: ${{ vars.DBT_TEST_ATHENA_S3_BUCKET }}/tmp_tables/ - DBT_TEST_ATHENA_REGION_NAME: ${{ vars.DBT_TEST_ATHENA_REGION_NAME }} - DBT_TEST_ATHENA_DATABASE: awsdatacatalog - DBT_TEST_ATHENA_SCHEMA: dbt-tests - DBT_TEST_ATHENA_WORK_GROUP: athena-dbt-tests - DBT_TEST_ATHENA_THREADS: 16 - DBT_TEST_ATHENA_POLL_INTERVAL: 0.5 - DBT_TEST_ATHENA_NUM_RETRIES: 3 - -jobs: - integration-tests: - runs-on: ${{ inputs.os }} - steps: - - uses: actions/checkout@v4 - with: - ref: ${{ inputs.branch }} - repository: ${{ inputs.repository }} - - uses: actions/setup-python@v5 - with: - python-version: ${{ inputs.python-version }} - - uses: pypa/hatch@install - - uses: aws-actions/configure-aws-credentials@v4 - with: - role-to-assume: arn:aws:iam::${{ secrets.AWS_ACCOUNT_ID }}:role/${{ secrets.ASSUMABLE_ROLE_NAME }} - aws-region: ${{ vars.DBT_TEST_ATHENA_REGION_NAME }} - - run: hatch run integration-tests - working-directory: ./${{ inputs.package }} diff --git a/.github/workflows/issue-triage.yml b/.github/workflows/issue-triage.yml deleted file mode 100644 index 7d2ed8e0..00000000 --- a/.github/workflows/issue-triage.yml +++ /dev/null @@ -1,20 +0,0 @@ -name: "Issue triage" -run-name: "Issue triage - #${{ github.event.issue.number }}: ${{ github.event.issue.title }} - ${{ github.actor }}" - -on: issue_comment - -defaults: - run: - shell: bash - -permissions: - issues: write - -jobs: - triage: - if: contains(github.event.issue.labels.*.name, 'awaiting_response') - uses: dbt-labs/actions/.github/workflows/swap-labels.yml@main - with: - add_label: "triage" - remove_label: "awaiting_response" - secrets: inherit diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml deleted file mode 100644 index 5066f1fb..00000000 --- a/.github/workflows/publish.yml +++ /dev/null @@ -1,70 +0,0 @@ -name: "Publish" - -on: - workflow_dispatch: - inputs: - deploy-to: - description: "Choose whether to deploy to test or prod" - type: environment - default: "prod" - branch: - description: "Choose the branch to release from" - type: string - default: "main" - pypi-internal: - description: "Publish Internally" - type: boolean - default: true - pypi-public: - description: "Publish to PyPI" - type: boolean - default: false - -# don't attempt to release the same target in parallel -concurrency: - group: ${{ github.workflow }}-${{ inputs.deploy-to }} - cancel-in-progress: true - -jobs: - unit-tests: - uses: dbt-labs/dbt-adapters/.github/workflows/_unit-tests.yml@main - with: - package: "dbt-athena" - branch: ${{ inputs.branch }} - - integration-tests: - uses: ./.github/workflows/_integration-tests.yml - with: - branch: ${{ inputs.branch }} - repository: ${{ github.repository }} - secrets: inherit - - publish-internal: - if: ${{ inputs.pypi-internal == true }} - needs: [unit-tests, integration-tests] - uses: dbt-labs/dbt-adapters/.github/workflows/_publish-internal.yml@main - with: - package: "dbt-athena" - deploy-to: ${{ inputs.deploy-to }} - branch: ${{ inputs.branch }} - secrets: inherit - - publish-pypi: - if: ${{ inputs.pypi-public == true }} - needs: [unit-tests, integration-tests] - uses: dbt-labs/dbt-adapters/.github/workflows/_publish-pypi.yml@main - with: - package: "dbt-athena" - deploy-to: ${{ inputs.deploy-to }} - branch: ${{ inputs.branch }} - - publish-pypi-dbt-athena-community: - if: ${{ inputs.pypi-public == true }} - # dbt-athena-community is hard pinned to dbt-athena to ensure they are the same - # this means we need to finish publishing dbt-athena before starting to build dbt-athena-community - needs: publish-pypi - uses: dbt-labs/dbt-adapters/.github/workflows/_publish-pypi.yml@main - with: - package: "dbt-athena-community" - deploy-to: ${{ inputs.deploy-to }} - branch: ${{ inputs.branch }} diff --git a/.github/workflows/pull-request-checks.yml b/.github/workflows/pull-request-checks.yml deleted file mode 100644 index c94e52c7..00000000 --- a/.github/workflows/pull-request-checks.yml +++ /dev/null @@ -1,81 +0,0 @@ -name: "Pull request checks" - -on: - pull_request_target: - types: [opened, reopened, synchronize, labeled, unlabeled] - -# only run this once per PR at a time -concurrency: - group: ${{ github.workflow }}-${{ github.event.number }} - cancel-in-progress: true - -jobs: - changelog-entry: - uses: dbt-labs/dbt-adapters/.github/workflows/_changelog-entry-check.yml@main - with: - package: "dbt-athena" - pull-request: ${{ github.event.pull_request.number }} - secrets: inherit - - code-quality: - uses: dbt-labs/dbt-adapters/.github/workflows/_code-quality.yml@main - with: - branch: ${{ github.event.pull_request.head.ref }} - repository: ${{ github.event.pull_request.head.repo.full_name }} - - verify-builds: - uses: dbt-labs/dbt-adapters/.github/workflows/_verify-build.yml@main - strategy: - fail-fast: false - matrix: - package: ["dbt-athena", "dbt-athena-community"] - os: [ubuntu-22.04] - python-version: ["3.9", "3.10", "3.11", "3.12"] - with: - package: ${{ matrix.package }} - branch: ${{ github.event.pull_request.head.ref }} - repository: ${{ github.event.pull_request.head.repo.full_name }} - os: ${{ matrix.os }} - python-version: ${{ matrix.python-version }} - - unit-tests: - uses: dbt-labs/dbt-adapters/.github/workflows/_unit-tests.yml@main - strategy: - fail-fast: false - matrix: - package: ["dbt-athena", "dbt-athena-community"] - os: [ubuntu-22.04] - python-version: ["3.9", "3.10", "3.11", "3.12"] - with: - package: ${{ matrix.package }} - branch: ${{ github.event.pull_request.head.ref }} - repository: ${{ github.event.pull_request.head.repo.full_name }} - os: ${{ matrix.os }} - python-version: ${{ matrix.python-version }} - - integration-tests: - uses: ./.github/workflows/_integration-tests.yml - strategy: - fail-fast: false - matrix: - package: ["dbt-athena", "dbt-athena-community"] - os: [ubuntu-22.04] - python-version: ["3.9", "3.10", "3.11", "3.12"] - with: - package: ${{ matrix.package }} - branch: ${{ github.event.pull_request.head.ref }} - repository: ${{ github.event.pull_request.head.repo.full_name }} - os: ${{ matrix.os }} - python-version: ${{ matrix.python-version }} - secrets: inherit - - # This job does nothing and is only used for branch protection - results: - name: "Pull request checks" - if: always() - needs: [code-quality, changelog-entry, verify-builds, unit-tests, integration-tests] - runs-on: ${{ vars.DEFAULT_RUNNER }} - steps: - - uses: re-actors/alls-green@release/v1 - with: - jobs: ${{ toJSON(needs) }} diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index e73dabdc..12a75195 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -1,126 +1,8 @@ # Contributing -## Requirements +This repository has moved into the `dbt-labs/dbt-adapters` monorepo found +[here](https://www.github.com/dbt-labs/dbt-adapters). +Please refer to that repo for a guide on how to contribute to `dbt-athena`. -* Python>=3.9 - [docs](https://www.python.org/) -* Hatch - [docs](https://hatch.pypa.io/dev/) - -## Getting started - -### Hatch - -This repository uses `hatch` as its primary development tool. -`hatch` will store its virtual environments in its own user space unless you configure it. -We strongly recommend that you configure `hatch` to store its virtual environments in an explicit location. -This has two benefits: - -* this path is predictable and easily discoverable, making it much easier to use with IDEs -* the default environment uses a hash for the name whereas the explicit environment will use -a predictable and human-readable name - -For example, we configure `hatch` to store its virtual environments in the project itself (first option below). -This is akin to running `python -m venv venv` from the project root. -Many folks prefer to store virtual environments in a central location separate from the project (second option below). - -```toml -# MacOS : ~/Library/Application Support/hatch/config.toml -# Windows : %USERPROFILE%\AppData\Local\hatch\config.toml -# Unix : ~.config/hatch/config.toml - -# this will create the virtual environment at `dbt-athena/dbt-athena/.hatch/dbt-athena -[dirs.env] -virtual = ".hatch" - -# this will create the virtual environment at `~/.hatch/dbt-athena` -[dirs.env] -virtual = "~/.hatch" -``` - -You can find the full docs [here](https://hatch.pypa.io/dev/config/hatch/) if you'd like to learn more about `hatch`. - -### Initial setup - -You will need to perform these steps the first time you contribute. -If you plan on contributing in the future (we would really appreciate that!), -most of this should persist and be reusable at that point in time. - -<!-- markdownlint-disable MD013 --> -* Fork the `dbt-athena` repo into your own user space on GitHub - [docs](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/fork-a-repo) -* Install `hatch` on your local machine - [docs](https://hatch.pypa.io/dev/install/) -* Clone the fork to your local machine - [docs](https://docs.github.com/en/repositories/creating-and-managing-repositories/cloning-a-repository) -* Navigate to the `dbt-athena` package directory - * There are two packages in this repository. Don't worry about `dbt-athena-community`, - it will automatically remain in sync with `dbt-athena` -* Setup your development environment with `hatch run setup`: - 1. Create a `hatch` virtual environment - 2. Install all dependencies - 3. Install pre-commit hooks - 4. Create a `test.env` stub file (formerly `.env`) -* Adjust the `test.env` file by configuring the environment variables to match your Athena development environment -<!-- markdownlint-restore --> - -```shell -# install `hatch` -pip install hatch - -# clone your fork -git clone https://github.com/<user>/dbt-athena - -# navigate to the dbt-athena package -cd dbt-athena - -# setup your development environment (formerly `make setup`) -hatch run setup -``` - -## Running tests and checks - -There are many checks that are collectively referred to as Code Quality checks as well as 2 different types of testing: - -* **code quality checks**: these checks include static analysis, type checking, and other code quality assurances -* **unit testing**: these tests are fast tests that don't require a platform connection -* **integration testing**: these tests are more thorough and require an AWS account with an Athena instance configured - * Details of the Athena instance also need to be configured in your `test.env` file - -These tests and checks can be run as follows: - -```shell -# run all pre-commit checks -hatch run code-quality - -# run unit tests (formerly `make unit_test`) -hatch run unit-tests - -# run integration tests (formerly `make integration_test`) -hatch run integration-tests - -# run unit tests and integration tests, formerly `make test` -hatch run all-tests - -# run specific integration tests -hatch run integration-tests tests/functional/my/test_file.py -``` - -## Submitting a pull request - -<!-- markdownlint-disable MD013 --> -* Create a commit with your changes and push them back up to your fork (e.g. `https://github.com/<user>/dbt-athena`) -* Create a [pull request](https://github.com/dbt-labs/dbt-athena/compare) on GitHub - [docs](https://docs.github.com/en/github/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork) - * The pull request title and commit messages should adhere to [conventional commits](https://www.conventionalcommits.org) - * The pull request body should describe _motivation_ -<!-- markdownlint-restore --> - -### General Guidelines - -* Keep your Pull Request small and focused on a single feature or bug fix -* Make sure your change is well tested - * Add new tests for completely new features or bug fixes - * Add scenarios to existing tests if extending a feature -* Make sure your change is well documented - * Mention when something is not obvious, or is being used for a specific purpose - * Provide a link to the GitHub bug in the docstring when writing a new test demonstrating the bug -* Provide a clear description in your pull request to allow the reviewer to understand the context of your changes - * Use a "self-review" to walk the reviewer through your thought process in a specific area - * Use a "self-review" to ask a question about how to handle a specific problem -* Use a draft pull request during development and mark it as Ready for Review when you're ready - * Ideally CI is also passing at this point, but you may also be looking for feedback on how to resolve an issue +If you have already opened a pull request and need to migrate it to the new repo, please refer to the +[contributing guide](https://github.com/dbt-labs/dbt-adapters/blob/main/CONTRIBUTING.md#submitting-a-pull-request). diff --git a/README.md b/README.md index c5487c05..99e2ec02 100644 --- a/README.md +++ b/README.md @@ -21,862 +21,5 @@ </a> </p> -<!-- TOC --> -- [Features](#features) - - [Quick start](#quick-start) - - [Installation](#installation) - - [Prerequisites](#prerequisites) - - [Credentials](#credentials) - - [Configuring your profile](#configuring-your-profile) - - [Additional information](#additional-information) - - [Models](#models) - - [Table configuration](#table-configuration) - - [Table location](#table-location) - - [Incremental models](#incremental-models) - - [On schema change](#on-schema-change) - - [Iceberg](#iceberg) - - [Highly available table (HA)](#highly-available-table-ha) - - [HA known issues](#ha-known-issues) - - [Update glue data catalog](#update-glue-data-catalog) - - [Snapshots](#snapshots) - - [Timestamp strategy](#timestamp-strategy) - - [Check strategy](#check-strategy) - - [Hard-deletes](#hard-deletes) - - [Working example](#working-example) - - [Snapshots known issues](#snapshots-known-issues) - - [AWS Lake Formation integration](#aws-lake-formation-integration) - - [Python models](#python-models) - - [Contracts](#contracts) - - [Contributing](#contributing) - - [Contributors ✨](#contributors-) -<!-- TOC --> - -# Features - -- Supports dbt version `1.7.*` -- Support for Python -- Supports [seeds][seeds] -- Correctly detects views and their columns -- Supports [table materialization][table] - - [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location** - (see table location section below) - - Hive tables are supported by both Athena engines -- Supports [incremental models][incremental] - - On Iceberg tables: - - Supports the use of `unique_key` only with the `merge` strategy - - Supports the `append` strategy - - On Hive tables: - - Supports two incremental update strategies: `insert_overwrite` and `append` - - Does **not** support the use of `unique_key` -- Supports [snapshots][snapshots] -- Supports [Python models][python-models] - -[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds - -[incremental]: https://docs.getdbt.com/docs/build/incremental-models - -[table]: https://docs.getdbt.com/docs/build/materializations#table - -[python-models]: https://docs.getdbt.com/docs/build/python-models#configuring-python-models - -[athena-iceberg]: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html - -[snapshots]: https://docs.getdbt.com/docs/build/snapshots - -## Quick start - -### Installation - -- `pip install dbt-athena-community` -- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git` - -### Prerequisites - -To start, you will need an S3 bucket, for instance `my-bucket` and an Athena database: - -```sql -CREATE DATABASE IF NOT EXISTS analytics_dev -COMMENT 'Analytics models generated by dbt (development)' -LOCATION 's3://my-bucket/' -WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com'); -``` - -Notes: - -- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.). -- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena - databases. - -### Credentials - -Credentials can be passed directly to the adapter, or they can -be [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) based -on `aws cli`/`boto3` conventions. -You can either: - -- Configure `aws_access_key_id` and `aws_secret_access_key` -- Configure `aws_profile_name` to match a profile defined in your AWS credentials file. - Checkout dbt profile configuration below for details. - -### Configuring your profile - -A dbt profile can be configured to run against AWS Athena using the following configuration: - -| Option | Description | Required? | Example | -|-----------------------|------------------------------------------------------------------------------------------|-----------|--------------------------------------------| -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | -| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | -| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` | -| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` | -| aws_access_key_id | Access key ID of the user performing requests | Optional | `AKIAIOSFODNN7EXAMPLE` | -| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` | -| aws_profile_name | Profile to use from your AWS shared credentials file | Optional | `my-profile` | -| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` | -| skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | `true` | -| num_retries | Number of times to retry a failing query | Optional | `3` | -| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` | -| num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | `3` | -| spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | `my-spark-workgroup` | -| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` | -| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` | - -**Example profiles.yml entry:** - -```yaml -athena: - target: dev - outputs: - dev: - type: athena - s3_staging_dir: s3://athena-query-results/dbt/ - s3_data_dir: s3://your_s3_bucket/dbt/ - s3_data_naming: schema_table - s3_tmp_table_dir: s3://your_s3_bucket/temp/ - region_name: eu-west-1 - schema: dbt - database: awsdatacatalog - threads: 4 - aws_profile_name: my-profile - work_group: my-workgroup - spark_work_group: my-spark-workgroup - seed_s3_upload_args: - ACL: bucket-owner-full-control -``` - -### Additional information - -- `threads` is supported -- `database` and `catalog` can be used interchangeably - -## Models - -### Table configuration - -- `external_location` (`default=none`) - - If set, the full S3 path to which the table will be saved - - Works only with incremental models - - Does not work with Hive table with `ha` set to true -- `partitioned_by` (`default=none`) - - An array list of columns by which the table will be partitioned - - Limited to creation of 100 partitions (*currently*) -- `bucketed_by` (`default=none`) - - An array list of columns to bucket data, ignored if using Iceberg -- `bucket_count` (`default=none`) - - The number of buckets for bucketing your data, ignored if using Iceberg -- `table_type` (`default='hive'`) - - The type of table - - Supports `hive` or `iceberg` -- `ha` (`default=false`) - - If the table should be built using the high-availability method. This option is only available for Hive tables - since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha)) -- `format` (`default='parquet'`) - - The data format for the table - - Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE` -- `write_compression` (`default=none`) - - The compression type to use for any storage format that allows compression to be specified. To see which options are - available, check out [CREATE TABLE AS][create-table-as] -- `field_delimiter` (`default=none`) - - Custom field delimiter, for when format is set to `TEXTFILE` -- `table_properties`: table properties to add to the table, valid for Iceberg only -- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be - made to manage data in S3. Data in S3 will only be cleared up for Iceberg - tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that - Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds. -- `seed_by_insert` (`default=false`) - - Default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement - - Large seed files cannot use `seed_by_insert`, as the SQL insert statement would - exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html) -- `force_batch` (`default=false`) - - Skip creating the table as CTAS and run the operation directly in batch insert mode - - This is particularly useful when the standard table creation process fails due to partition limitations, - allowing you to work with temporary tables and persist the dataset more efficiently -- `unique_tmp_table_suffix` (`default=false`) - - For incremental models using insert overwrite strategy on hive table - - Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid - - Useful if you are looking to run multiple dbt build inserting in the same table in parallel -- `temp_schema` (`default=none`) - - For incremental models, it allows to define a schema to hold temporary create statements - used in incremental model runs - - Schema will be created in the model target database if does not exist -- `lf_tags_config` (`default=none`) - - [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns - - `enabled` (`default=False`) whether LF tags management is enabled for a model - - `tags` dictionary with tags and their values to assign for the model - - `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to - - `lf_inherited_tags` (`default=none`) - - List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be - removed during association of those defined in `lf_tags_config` - - i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from - tables and columns before associating the ones currently defined for a given model - - This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly - -```sql -{{ - config( - materialized='incremental', - incremental_strategy='append', - on_schema_change='append_new_columns', - table_type='iceberg', - schema='test_schema', - lf_tags_config={ - 'enabled': true, - 'tags': { - 'tag1': 'value1', - 'tag2': 'value2' - }, - 'tags_columns': { - 'tag1': { - 'value1': ['column1', 'column2'], - 'value2': ['column3', 'column4'] - } - }, - 'inherited_tags': ['tag1', 'tag2'] - } - ) -}} -``` - -- Format for `dbt_project.yml`: - -```yaml - +lf_tags_config: - enabled: true - tags: - tag1: value1 - tag2: value2 - tags_columns: - tag1: - value1: [ column1, column2 ] - inherited_tags: [ tag1, tag2 ] -``` - -- `lf_grants` (`default=none`) - - Lake Formation grants config for data_cell filters - - Format: - - ```python - lf_grants={ - 'data_cell_filters': { - 'enabled': True | False, - 'filters': { - 'filter_name': { - 'row_filter': '<filter_condition>', - 'principals': ['principal_arn1', 'principal_arn2'] - } - } - } - } - ``` - -> Notes: -> -> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources. -> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use -> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or -> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose. -> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table -> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every -> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and -> apply changes if they occur: drop, create, update filters and their permissions. -> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at - the table and column level -> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made -> previously -> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed - prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform - where the inherited value is configured nor in the DBT project where the override previously existed but now is - gone) - -[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties - -### Table location - -The location a table is saved to is determined by: - -1. If `external_location` is defined, that value is used -2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming` -3. If `s3_data_dir` is not defined, data is stored under `s3_staging_dir/tables/` - -Here all the options available for `s3_data_naming`: - -- `unique`: `{s3_data_dir}/{uuid4()}/` -- `table`: `{s3_data_dir}/{table}/` -- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/` -- `schema_table`: `{s3_data_dir}/{schema}/{table}/` -- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/` - -It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config, -or setting up the value for groups of model in dbt_project.yml. - -> Note: when using a workgroup with a default output location configured, `s3_data_naming` and any configured buckets -> are ignored and the location configured in the workgroup is used. - -### Incremental models - -Support for [incremental models](https://docs.getdbt.com/docs/build/incremental-models). - -These strategies are supported: - -- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination - table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no - partitions are defined, dbt will fall back to the `append` strategy. -- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate - data (e.g. great for log or historical data). -- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`. - Only available when using Iceberg. - -### On schema change - -`on_schema_change` is an option to reflect changes of schema in incremental models. -The following options are supported: - -- `ignore` (default) -- `fail` -- `append_new_columns` -- `sync_all_columns` - -For details, please refer -to [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change). - -### Iceberg - -The adapter supports table materialization for Iceberg. - -To get started just add this as your model: - -```sql -{{ config( - materialized='table', - table_type='iceberg', - format='parquet', - partitioned_by=['bucket(user_id, 5)'], - table_properties={ - 'optimize_rewrite_delete_file_threshold': '2' - } -) }} - -select 'A' as user_id, - 'pi' as name, - 'active' as status, - 17.89 as cost, - 1 as quantity, - 100000000 as quantity_big, - current_date as my_date -``` - -Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing -conditions. - -Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`. - -It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported: - -- `append`: New records are appended to the table, this can lead to duplicates. -- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only - available with Athena engine version 3. - - `unique_key` **(required)**: columns that define a unique record in the source and target tables. - - `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can - be useful for improving performance via predicate pushdown on the target table. - - `delete_condition` (optional): SQL condition used to identify records that should be deleted. - - `update_condition` (optional): SQL condition used to identify records that should be updated. - - `insert_condition` (optional): SQL condition used to identify records that should be inserted. - - `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of - the incremental table (`src`) or the final table (`target`). - Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error. - -`delete_condition` example: - -```sql -{{ config( - materialized='incremental', - table_type='iceberg', - incremental_strategy='merge', - unique_key='user_id', - incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"], - delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year", - format='parquet' -) }} - -select 'A' as user_id, - 'pi' as name, - 'active' as status, - 17.89 as cost, - 1 as quantity, - 100000000 as quantity_big, - current_date as my_date -``` - -`update_condition` example: - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - unique_key=['id'], - update_condition='target.id > 1', - schema='sandbox' - ) -}} - -{% if is_incremental() %} - -select * from ( - values - (1, 'v1-updated') - , (2, 'v2-updated') -) as t (id, value) - -{% else %} - -select * from ( - values - (-1, 'v-1') - , (0, 'v0') - , (1, 'v1') - , (2, 'v2') -) as t (id, value) - -{% endif %} -``` - -`insert_condition` example: - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - unique_key=['id'], - insert_condition='target.status != 0', - schema='sandbox' - ) -}} - -select * from ( - values - (1, 0) - , (2, 1) -) as t (id, status) - -``` - -### Highly available table (HA) - -The current implementation of the table materialization can lead to downtime, as the target table is -dropped and re-created. To have the less destructive behavior it's possible to use the `ha` config on -your `table` materialized models. It leverages the table versions feature of glue catalog, creating -a temp table and swapping the target table to the location of the temp table. This materialization -is only available for `table_type=hive` and requires using unique locations. For iceberg, high -availability is the default. - -```sql -{{ config( - materialized='table', - ha=true, - format='parquet', - table_type='hive', - partitioned_by=['status'], - s3_data_naming='table_unique' -) }} - -select 'a' as user_id, - 'pi' as user_name, - 'active' as status -union all -select 'b' as user_id, - 'sh' as user_name, - 'disabled' as status -``` - -By default, the materialization keeps the last 4 table versions, you can change it by setting `versions_to_keep`. - -#### HA known issues - -- When swapping from a table with partitions to a table without (and the other way around), there could be a little - downtime. - If high performances is needed consider bucketing instead of partitions -- By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location -- It's recommended to set `versions_to_keep` >= 4, as this will avoid having the older location removed - -### Update glue data catalog - -Optionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as -[glue table properties](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html#table-properties) -and [column parameters](https://docs.aws.amazon.com/glue/latest/webapi/API_Column.html). -By default, documentation persistence is disabled, but it can be enabled for specific resources or -groups of resources as needed. - -For example: - -```yaml -models: - - name: test_deduplicate - description: another value - config: - persist_docs: - relation: true - columns: true - meta: - test: value - columns: - - name: id - meta: - primary_key: true -``` - -See [persist docs](https://docs.getdbt.com/reference/resource-configs/persist_docs) for more details. - -## Snapshots - -The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot -create a snapshot file in the snapshots directory. If the directory does not exist create one. - -### Timestamp strategy - -To use the timestamp strategy refer to -the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#timestamp-strategy-recommended) - -### Check strategy - -To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#check-strategy) - -### Hard-deletes - -The materialization also supports invalidating hard deletes. Check -the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage. - -### Working example - -seed file - employent_indicators_november_2022_csv_tables.csv - -```csv -Series_reference,Period,Data_value,Suppressed -MEIM.S1WA,1999.04,80267, -MEIM.S1WA,1999.05,70803, -MEIM.S1WA,1999.06,65792, -MEIM.S1WA,1999.07,66194, -MEIM.S1WA,1999.08,67259, -MEIM.S1WA,1999.09,69691, -MEIM.S1WA,1999.1,72475, -MEIM.S1WA,1999.11,79263, -MEIM.S1WA,1999.12,86540, -MEIM.S1WA,2000.01,82552, -MEIM.S1WA,2000.02,81709, -MEIM.S1WA,2000.03,84126, -MEIM.S1WA,2000.04,77089, -MEIM.S1WA,2000.05,73811, -MEIM.S1WA,2000.06,70070, -MEIM.S1WA,2000.07,69873, -MEIM.S1WA,2000.08,71468, -MEIM.S1WA,2000.09,72462, -MEIM.S1WA,2000.1,74897, -``` - -model.sql - -```sql -{{ config( - materialized='table' -) }} - -select row_number() over() as id - , * - , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp -from {{ ref('employment_indicators_november_2022_csv_tables') }} -``` - -timestamp strategy - model_snapshot_1 - -```sql -{% snapshot model_snapshot_1 %} - -{{ - config( - strategy='timestamp', - updated_at='refresh_timestamp', - unique_key='id' - ) -}} - -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -invalidate hard deletes - model_snapshot_2 - -```sql -{% snapshot model_snapshot_2 %} - -{{ - config - ( - unique_key='id', - strategy='timestamp', - updated_at='refresh_timestamp', - invalidate_hard_deletes=True, - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -check strategy - model_snapshot_3 - -```sql -{% snapshot model_snapshot_3 %} - -{{ - config - ( - unique_key='id', - strategy='check', - check_cols=['series_reference','data_value'] - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -### Snapshots known issues - -- Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning. - The only way, from a dbt perspective, is to do a full-refresh of the incremental model. - -- Tables, schemas and database names should only be lowercase - -- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not - installed in the target environment. - See <https://github.com/dbt-athena/dbt-athena/issues/103> for more details. - -- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column - from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history - -## AWS Lake Formation integration - -The adapter implements AWS Lake Formation tags management in the following way: - -- You can enable or disable lf-tags management via [config](#table-configuration) (disabled by default) -- Once you enable the feature, lf-tags will be updated on every dbt run -- First, all lf-tags for columns are removed to avoid inheritance issues -- Then, all redundant lf-tags are removed from tables and actual tags from table configs are applied -- Finally, lf-tags for columns are applied - -It's important to understand the following points: - -- dbt does not manage lf-tags for databases -- dbt does not manage Lake Formation permissions - -That's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc. -You may find the following links useful to manage that: - -<!-- markdownlint-disable --> -* [terraform aws_lakeformation_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) -* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags) -<!-- markdownlint-restore --> - -## Python models - -The adapter supports Python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). - -### Setup - -- A Spark-enabled workgroup created in Athena -- Spark execution role granted access to Athena, Glue and S3 -- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile to be used - is referenced in `dbt_project.yml` - -### Spark-specific table configuration - -- `timeout` (`default=43200`) - - Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds. -- `spark_encryption` (`default=false`) - - If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored - locally by Spark. -- `spark_cross_account_catalog` (`default=false`) - - When using the Spark Athena workgroup, queries can only be made against catalogs located on the same - AWS account by default. However, sometimes you want to query another catalog located on an external AWS - account. Setting this additional Spark properties parameter to true will enable querying external catalogs. - You can use the syntax `external_catalog_id/database.table` to access the external table on the external - catalog (ex: `999999999999/mydatabase.cloudfront_logs` where 999999999999 is the external catalog ID) -- `spark_requester_pays` (`default=false`) - - When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for - data access and data transfer fees associated with the query. - - If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark. - -### Spark notes - -- A session is created for each unique engine configuration defined in the models that are part of the invocation. -- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation - (Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session. -- The number of Python models running at a time depends on the `threads`. The number of sessions created for the - entire run depends on the number of unique engine configurations and the availability of sessions to maintain - thread concurrency. -- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2. - This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark. - -### Example models - -#### Simple pandas model - -```python -import pandas as pd - - -def model(dbt, session): - dbt.config(materialized="table") - - model_df = pd.DataFrame({"A": [1, 2, 3, 4]}) - - return model_df -``` - -#### Simple spark - -```python -def model(dbt, spark_session): - dbt.config(materialized="table") - - data = [(1,), (2,), (3,), (4,)] - - df = spark_session.createDataFrame(data, ["A"]) - - return df -``` - -#### Spark incremental - -```python -def model(dbt, spark_session): - dbt.config(materialized="incremental") - df = dbt.ref("model") - - if dbt.is_incremental: - max_from_this = ( - f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}" - ) - df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0]) - - return df -``` - -#### Config spark model - -```python -def model(dbt, spark_session): - dbt.config( - materialized="table", - engine_config={ - "CoordinatorDpuSize": 1, - "MaxConcurrentDpus": 3, - "DefaultExecutorDpuSize": 1 - }, - spark_encryption=True, - spark_cross_account_catalog=True, - spark_requester_pays=True - polling_interval=15, - timeout=120, - ) - - data = [(1,), (2,), (3,), (4,)] - - df = spark_session.createDataFrame(data, ["A"]) - - return df -``` - -#### Create pySpark udf using imported external python files - -```python -def model(dbt, spark_session): - dbt.config( - materialized="incremental", - incremental_strategy="merge", - unique_key="num", - ) - sc = spark_session.sparkContext - sc.addPyFile("s3://athena-dbt/test/file1.py") - sc.addPyFile("s3://athena-dbt/test/file2.py") - - def func(iterator): - from file2 import transform - - return [transform(i) for i in iterator] - - from pyspark.sql.functions import udf - from pyspark.sql.functions import col - - udf_with_import = udf(func) - - data = [(1, "a"), (2, "b"), (3, "c")] - cols = ["num", "alpha"] - df = spark_session.createDataFrame(data, cols) - - return df.withColumn("udf_test_col", udf_with_import(col("alpha"))) -``` - -### Known issues in Python models - -- Python models cannot - [reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). -- Third-party Python libraries can be used, but they must be [included in the pre-installed list][pre-installed list] - or [imported manually][imported manually]. -- Python models can only reference or write to tables with names meeting the - regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not - supported by Spark, even though Athena supports them. -- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which - runs on Trino. -- Snapshot materializations are not supported. -- Spark can only reference tables within the same catalog. -- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error -when trying to create the table. - -[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html -[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html - -## Contracts - -The adapter partly supports contract definitions: - -- `data_type` is supported but needs to be adjusted for complex types. Types must be specified - entirely (for instance `array<int>`) even though they won't be checked. Indeed, as dbt recommends, we only compare - the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types - defined in Athena are ok (pre-flight check). -- The adapter does not support the constraints since there is no constraint concept in Athena. - -## Contributing - -See [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to this project. - -## Contributors ✨ - -Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): - -<a href="https://github.com/dbt-athena/dbt-athena/graphs/contributors"> - <img src="https://contrib.rocks/image?repo=dbt-athena/dbt-athena" /> -</a> - -Contributions of any kind welcome! +This repository as moved into the `dbt-labs/dbt-adapters` monorepo found +[here](https://www.github.com/dbt-labs/dbt-adapters). diff --git a/RELEASING.md b/RELEASING.md deleted file mode 100644 index 6abd975f..00000000 --- a/RELEASING.md +++ /dev/null @@ -1,11 +0,0 @@ -# How to release - -* Open a pull request with a manual bump on `dbt-athena/dbt/adapters/athena/__version__.py` -* Create a new release from <https://github.com/dbt-labs/dbt-athena/releases> - * Be sure to use the same version as in the `__version__.py` file - * Be sure to start the release with `v` e.g. v1.6.3 - * Tag with the same name of the release e.g. v1.6.3 - * Be sure to clean up release notes grouping by semantic commit type, - e.g. all feat commits should under the same section -* Once the new release is made be sure that the new package version is available on PyPI - in [PyPI](https://pypi.org/project/dbt-athena/) diff --git a/dbt-athena-community/README.md b/dbt-athena-community/README.md index c5487c05..99e2ec02 100644 --- a/dbt-athena-community/README.md +++ b/dbt-athena-community/README.md @@ -21,862 +21,5 @@ </a> </p> -<!-- TOC --> -- [Features](#features) - - [Quick start](#quick-start) - - [Installation](#installation) - - [Prerequisites](#prerequisites) - - [Credentials](#credentials) - - [Configuring your profile](#configuring-your-profile) - - [Additional information](#additional-information) - - [Models](#models) - - [Table configuration](#table-configuration) - - [Table location](#table-location) - - [Incremental models](#incremental-models) - - [On schema change](#on-schema-change) - - [Iceberg](#iceberg) - - [Highly available table (HA)](#highly-available-table-ha) - - [HA known issues](#ha-known-issues) - - [Update glue data catalog](#update-glue-data-catalog) - - [Snapshots](#snapshots) - - [Timestamp strategy](#timestamp-strategy) - - [Check strategy](#check-strategy) - - [Hard-deletes](#hard-deletes) - - [Working example](#working-example) - - [Snapshots known issues](#snapshots-known-issues) - - [AWS Lake Formation integration](#aws-lake-formation-integration) - - [Python models](#python-models) - - [Contracts](#contracts) - - [Contributing](#contributing) - - [Contributors ✨](#contributors-) -<!-- TOC --> - -# Features - -- Supports dbt version `1.7.*` -- Support for Python -- Supports [seeds][seeds] -- Correctly detects views and their columns -- Supports [table materialization][table] - - [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location** - (see table location section below) - - Hive tables are supported by both Athena engines -- Supports [incremental models][incremental] - - On Iceberg tables: - - Supports the use of `unique_key` only with the `merge` strategy - - Supports the `append` strategy - - On Hive tables: - - Supports two incremental update strategies: `insert_overwrite` and `append` - - Does **not** support the use of `unique_key` -- Supports [snapshots][snapshots] -- Supports [Python models][python-models] - -[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds - -[incremental]: https://docs.getdbt.com/docs/build/incremental-models - -[table]: https://docs.getdbt.com/docs/build/materializations#table - -[python-models]: https://docs.getdbt.com/docs/build/python-models#configuring-python-models - -[athena-iceberg]: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html - -[snapshots]: https://docs.getdbt.com/docs/build/snapshots - -## Quick start - -### Installation - -- `pip install dbt-athena-community` -- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git` - -### Prerequisites - -To start, you will need an S3 bucket, for instance `my-bucket` and an Athena database: - -```sql -CREATE DATABASE IF NOT EXISTS analytics_dev -COMMENT 'Analytics models generated by dbt (development)' -LOCATION 's3://my-bucket/' -WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com'); -``` - -Notes: - -- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.). -- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena - databases. - -### Credentials - -Credentials can be passed directly to the adapter, or they can -be [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) based -on `aws cli`/`boto3` conventions. -You can either: - -- Configure `aws_access_key_id` and `aws_secret_access_key` -- Configure `aws_profile_name` to match a profile defined in your AWS credentials file. - Checkout dbt profile configuration below for details. - -### Configuring your profile - -A dbt profile can be configured to run against AWS Athena using the following configuration: - -| Option | Description | Required? | Example | -|-----------------------|------------------------------------------------------------------------------------------|-----------|--------------------------------------------| -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | -| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | -| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` | -| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` | -| aws_access_key_id | Access key ID of the user performing requests | Optional | `AKIAIOSFODNN7EXAMPLE` | -| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` | -| aws_profile_name | Profile to use from your AWS shared credentials file | Optional | `my-profile` | -| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` | -| skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | `true` | -| num_retries | Number of times to retry a failing query | Optional | `3` | -| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` | -| num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | `3` | -| spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | `my-spark-workgroup` | -| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` | -| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` | - -**Example profiles.yml entry:** - -```yaml -athena: - target: dev - outputs: - dev: - type: athena - s3_staging_dir: s3://athena-query-results/dbt/ - s3_data_dir: s3://your_s3_bucket/dbt/ - s3_data_naming: schema_table - s3_tmp_table_dir: s3://your_s3_bucket/temp/ - region_name: eu-west-1 - schema: dbt - database: awsdatacatalog - threads: 4 - aws_profile_name: my-profile - work_group: my-workgroup - spark_work_group: my-spark-workgroup - seed_s3_upload_args: - ACL: bucket-owner-full-control -``` - -### Additional information - -- `threads` is supported -- `database` and `catalog` can be used interchangeably - -## Models - -### Table configuration - -- `external_location` (`default=none`) - - If set, the full S3 path to which the table will be saved - - Works only with incremental models - - Does not work with Hive table with `ha` set to true -- `partitioned_by` (`default=none`) - - An array list of columns by which the table will be partitioned - - Limited to creation of 100 partitions (*currently*) -- `bucketed_by` (`default=none`) - - An array list of columns to bucket data, ignored if using Iceberg -- `bucket_count` (`default=none`) - - The number of buckets for bucketing your data, ignored if using Iceberg -- `table_type` (`default='hive'`) - - The type of table - - Supports `hive` or `iceberg` -- `ha` (`default=false`) - - If the table should be built using the high-availability method. This option is only available for Hive tables - since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha)) -- `format` (`default='parquet'`) - - The data format for the table - - Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE` -- `write_compression` (`default=none`) - - The compression type to use for any storage format that allows compression to be specified. To see which options are - available, check out [CREATE TABLE AS][create-table-as] -- `field_delimiter` (`default=none`) - - Custom field delimiter, for when format is set to `TEXTFILE` -- `table_properties`: table properties to add to the table, valid for Iceberg only -- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be - made to manage data in S3. Data in S3 will only be cleared up for Iceberg - tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that - Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds. -- `seed_by_insert` (`default=false`) - - Default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement - - Large seed files cannot use `seed_by_insert`, as the SQL insert statement would - exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html) -- `force_batch` (`default=false`) - - Skip creating the table as CTAS and run the operation directly in batch insert mode - - This is particularly useful when the standard table creation process fails due to partition limitations, - allowing you to work with temporary tables and persist the dataset more efficiently -- `unique_tmp_table_suffix` (`default=false`) - - For incremental models using insert overwrite strategy on hive table - - Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid - - Useful if you are looking to run multiple dbt build inserting in the same table in parallel -- `temp_schema` (`default=none`) - - For incremental models, it allows to define a schema to hold temporary create statements - used in incremental model runs - - Schema will be created in the model target database if does not exist -- `lf_tags_config` (`default=none`) - - [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns - - `enabled` (`default=False`) whether LF tags management is enabled for a model - - `tags` dictionary with tags and their values to assign for the model - - `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to - - `lf_inherited_tags` (`default=none`) - - List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be - removed during association of those defined in `lf_tags_config` - - i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from - tables and columns before associating the ones currently defined for a given model - - This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly - -```sql -{{ - config( - materialized='incremental', - incremental_strategy='append', - on_schema_change='append_new_columns', - table_type='iceberg', - schema='test_schema', - lf_tags_config={ - 'enabled': true, - 'tags': { - 'tag1': 'value1', - 'tag2': 'value2' - }, - 'tags_columns': { - 'tag1': { - 'value1': ['column1', 'column2'], - 'value2': ['column3', 'column4'] - } - }, - 'inherited_tags': ['tag1', 'tag2'] - } - ) -}} -``` - -- Format for `dbt_project.yml`: - -```yaml - +lf_tags_config: - enabled: true - tags: - tag1: value1 - tag2: value2 - tags_columns: - tag1: - value1: [ column1, column2 ] - inherited_tags: [ tag1, tag2 ] -``` - -- `lf_grants` (`default=none`) - - Lake Formation grants config for data_cell filters - - Format: - - ```python - lf_grants={ - 'data_cell_filters': { - 'enabled': True | False, - 'filters': { - 'filter_name': { - 'row_filter': '<filter_condition>', - 'principals': ['principal_arn1', 'principal_arn2'] - } - } - } - } - ``` - -> Notes: -> -> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources. -> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use -> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or -> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose. -> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table -> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every -> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and -> apply changes if they occur: drop, create, update filters and their permissions. -> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at - the table and column level -> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made -> previously -> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed - prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform - where the inherited value is configured nor in the DBT project where the override previously existed but now is - gone) - -[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties - -### Table location - -The location a table is saved to is determined by: - -1. If `external_location` is defined, that value is used -2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming` -3. If `s3_data_dir` is not defined, data is stored under `s3_staging_dir/tables/` - -Here all the options available for `s3_data_naming`: - -- `unique`: `{s3_data_dir}/{uuid4()}/` -- `table`: `{s3_data_dir}/{table}/` -- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/` -- `schema_table`: `{s3_data_dir}/{schema}/{table}/` -- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/` - -It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config, -or setting up the value for groups of model in dbt_project.yml. - -> Note: when using a workgroup with a default output location configured, `s3_data_naming` and any configured buckets -> are ignored and the location configured in the workgroup is used. - -### Incremental models - -Support for [incremental models](https://docs.getdbt.com/docs/build/incremental-models). - -These strategies are supported: - -- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination - table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no - partitions are defined, dbt will fall back to the `append` strategy. -- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate - data (e.g. great for log or historical data). -- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`. - Only available when using Iceberg. - -### On schema change - -`on_schema_change` is an option to reflect changes of schema in incremental models. -The following options are supported: - -- `ignore` (default) -- `fail` -- `append_new_columns` -- `sync_all_columns` - -For details, please refer -to [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change). - -### Iceberg - -The adapter supports table materialization for Iceberg. - -To get started just add this as your model: - -```sql -{{ config( - materialized='table', - table_type='iceberg', - format='parquet', - partitioned_by=['bucket(user_id, 5)'], - table_properties={ - 'optimize_rewrite_delete_file_threshold': '2' - } -) }} - -select 'A' as user_id, - 'pi' as name, - 'active' as status, - 17.89 as cost, - 1 as quantity, - 100000000 as quantity_big, - current_date as my_date -``` - -Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing -conditions. - -Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`. - -It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported: - -- `append`: New records are appended to the table, this can lead to duplicates. -- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only - available with Athena engine version 3. - - `unique_key` **(required)**: columns that define a unique record in the source and target tables. - - `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can - be useful for improving performance via predicate pushdown on the target table. - - `delete_condition` (optional): SQL condition used to identify records that should be deleted. - - `update_condition` (optional): SQL condition used to identify records that should be updated. - - `insert_condition` (optional): SQL condition used to identify records that should be inserted. - - `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of - the incremental table (`src`) or the final table (`target`). - Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error. - -`delete_condition` example: - -```sql -{{ config( - materialized='incremental', - table_type='iceberg', - incremental_strategy='merge', - unique_key='user_id', - incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"], - delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year", - format='parquet' -) }} - -select 'A' as user_id, - 'pi' as name, - 'active' as status, - 17.89 as cost, - 1 as quantity, - 100000000 as quantity_big, - current_date as my_date -``` - -`update_condition` example: - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - unique_key=['id'], - update_condition='target.id > 1', - schema='sandbox' - ) -}} - -{% if is_incremental() %} - -select * from ( - values - (1, 'v1-updated') - , (2, 'v2-updated') -) as t (id, value) - -{% else %} - -select * from ( - values - (-1, 'v-1') - , (0, 'v0') - , (1, 'v1') - , (2, 'v2') -) as t (id, value) - -{% endif %} -``` - -`insert_condition` example: - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - unique_key=['id'], - insert_condition='target.status != 0', - schema='sandbox' - ) -}} - -select * from ( - values - (1, 0) - , (2, 1) -) as t (id, status) - -``` - -### Highly available table (HA) - -The current implementation of the table materialization can lead to downtime, as the target table is -dropped and re-created. To have the less destructive behavior it's possible to use the `ha` config on -your `table` materialized models. It leverages the table versions feature of glue catalog, creating -a temp table and swapping the target table to the location of the temp table. This materialization -is only available for `table_type=hive` and requires using unique locations. For iceberg, high -availability is the default. - -```sql -{{ config( - materialized='table', - ha=true, - format='parquet', - table_type='hive', - partitioned_by=['status'], - s3_data_naming='table_unique' -) }} - -select 'a' as user_id, - 'pi' as user_name, - 'active' as status -union all -select 'b' as user_id, - 'sh' as user_name, - 'disabled' as status -``` - -By default, the materialization keeps the last 4 table versions, you can change it by setting `versions_to_keep`. - -#### HA known issues - -- When swapping from a table with partitions to a table without (and the other way around), there could be a little - downtime. - If high performances is needed consider bucketing instead of partitions -- By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location -- It's recommended to set `versions_to_keep` >= 4, as this will avoid having the older location removed - -### Update glue data catalog - -Optionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as -[glue table properties](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html#table-properties) -and [column parameters](https://docs.aws.amazon.com/glue/latest/webapi/API_Column.html). -By default, documentation persistence is disabled, but it can be enabled for specific resources or -groups of resources as needed. - -For example: - -```yaml -models: - - name: test_deduplicate - description: another value - config: - persist_docs: - relation: true - columns: true - meta: - test: value - columns: - - name: id - meta: - primary_key: true -``` - -See [persist docs](https://docs.getdbt.com/reference/resource-configs/persist_docs) for more details. - -## Snapshots - -The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot -create a snapshot file in the snapshots directory. If the directory does not exist create one. - -### Timestamp strategy - -To use the timestamp strategy refer to -the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#timestamp-strategy-recommended) - -### Check strategy - -To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#check-strategy) - -### Hard-deletes - -The materialization also supports invalidating hard deletes. Check -the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage. - -### Working example - -seed file - employent_indicators_november_2022_csv_tables.csv - -```csv -Series_reference,Period,Data_value,Suppressed -MEIM.S1WA,1999.04,80267, -MEIM.S1WA,1999.05,70803, -MEIM.S1WA,1999.06,65792, -MEIM.S1WA,1999.07,66194, -MEIM.S1WA,1999.08,67259, -MEIM.S1WA,1999.09,69691, -MEIM.S1WA,1999.1,72475, -MEIM.S1WA,1999.11,79263, -MEIM.S1WA,1999.12,86540, -MEIM.S1WA,2000.01,82552, -MEIM.S1WA,2000.02,81709, -MEIM.S1WA,2000.03,84126, -MEIM.S1WA,2000.04,77089, -MEIM.S1WA,2000.05,73811, -MEIM.S1WA,2000.06,70070, -MEIM.S1WA,2000.07,69873, -MEIM.S1WA,2000.08,71468, -MEIM.S1WA,2000.09,72462, -MEIM.S1WA,2000.1,74897, -``` - -model.sql - -```sql -{{ config( - materialized='table' -) }} - -select row_number() over() as id - , * - , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp -from {{ ref('employment_indicators_november_2022_csv_tables') }} -``` - -timestamp strategy - model_snapshot_1 - -```sql -{% snapshot model_snapshot_1 %} - -{{ - config( - strategy='timestamp', - updated_at='refresh_timestamp', - unique_key='id' - ) -}} - -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -invalidate hard deletes - model_snapshot_2 - -```sql -{% snapshot model_snapshot_2 %} - -{{ - config - ( - unique_key='id', - strategy='timestamp', - updated_at='refresh_timestamp', - invalidate_hard_deletes=True, - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -check strategy - model_snapshot_3 - -```sql -{% snapshot model_snapshot_3 %} - -{{ - config - ( - unique_key='id', - strategy='check', - check_cols=['series_reference','data_value'] - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -### Snapshots known issues - -- Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning. - The only way, from a dbt perspective, is to do a full-refresh of the incremental model. - -- Tables, schemas and database names should only be lowercase - -- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not - installed in the target environment. - See <https://github.com/dbt-athena/dbt-athena/issues/103> for more details. - -- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column - from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history - -## AWS Lake Formation integration - -The adapter implements AWS Lake Formation tags management in the following way: - -- You can enable or disable lf-tags management via [config](#table-configuration) (disabled by default) -- Once you enable the feature, lf-tags will be updated on every dbt run -- First, all lf-tags for columns are removed to avoid inheritance issues -- Then, all redundant lf-tags are removed from tables and actual tags from table configs are applied -- Finally, lf-tags for columns are applied - -It's important to understand the following points: - -- dbt does not manage lf-tags for databases -- dbt does not manage Lake Formation permissions - -That's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc. -You may find the following links useful to manage that: - -<!-- markdownlint-disable --> -* [terraform aws_lakeformation_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) -* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags) -<!-- markdownlint-restore --> - -## Python models - -The adapter supports Python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). - -### Setup - -- A Spark-enabled workgroup created in Athena -- Spark execution role granted access to Athena, Glue and S3 -- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile to be used - is referenced in `dbt_project.yml` - -### Spark-specific table configuration - -- `timeout` (`default=43200`) - - Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds. -- `spark_encryption` (`default=false`) - - If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored - locally by Spark. -- `spark_cross_account_catalog` (`default=false`) - - When using the Spark Athena workgroup, queries can only be made against catalogs located on the same - AWS account by default. However, sometimes you want to query another catalog located on an external AWS - account. Setting this additional Spark properties parameter to true will enable querying external catalogs. - You can use the syntax `external_catalog_id/database.table` to access the external table on the external - catalog (ex: `999999999999/mydatabase.cloudfront_logs` where 999999999999 is the external catalog ID) -- `spark_requester_pays` (`default=false`) - - When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for - data access and data transfer fees associated with the query. - - If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark. - -### Spark notes - -- A session is created for each unique engine configuration defined in the models that are part of the invocation. -- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation - (Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session. -- The number of Python models running at a time depends on the `threads`. The number of sessions created for the - entire run depends on the number of unique engine configurations and the availability of sessions to maintain - thread concurrency. -- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2. - This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark. - -### Example models - -#### Simple pandas model - -```python -import pandas as pd - - -def model(dbt, session): - dbt.config(materialized="table") - - model_df = pd.DataFrame({"A": [1, 2, 3, 4]}) - - return model_df -``` - -#### Simple spark - -```python -def model(dbt, spark_session): - dbt.config(materialized="table") - - data = [(1,), (2,), (3,), (4,)] - - df = spark_session.createDataFrame(data, ["A"]) - - return df -``` - -#### Spark incremental - -```python -def model(dbt, spark_session): - dbt.config(materialized="incremental") - df = dbt.ref("model") - - if dbt.is_incremental: - max_from_this = ( - f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}" - ) - df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0]) - - return df -``` - -#### Config spark model - -```python -def model(dbt, spark_session): - dbt.config( - materialized="table", - engine_config={ - "CoordinatorDpuSize": 1, - "MaxConcurrentDpus": 3, - "DefaultExecutorDpuSize": 1 - }, - spark_encryption=True, - spark_cross_account_catalog=True, - spark_requester_pays=True - polling_interval=15, - timeout=120, - ) - - data = [(1,), (2,), (3,), (4,)] - - df = spark_session.createDataFrame(data, ["A"]) - - return df -``` - -#### Create pySpark udf using imported external python files - -```python -def model(dbt, spark_session): - dbt.config( - materialized="incremental", - incremental_strategy="merge", - unique_key="num", - ) - sc = spark_session.sparkContext - sc.addPyFile("s3://athena-dbt/test/file1.py") - sc.addPyFile("s3://athena-dbt/test/file2.py") - - def func(iterator): - from file2 import transform - - return [transform(i) for i in iterator] - - from pyspark.sql.functions import udf - from pyspark.sql.functions import col - - udf_with_import = udf(func) - - data = [(1, "a"), (2, "b"), (3, "c")] - cols = ["num", "alpha"] - df = spark_session.createDataFrame(data, cols) - - return df.withColumn("udf_test_col", udf_with_import(col("alpha"))) -``` - -### Known issues in Python models - -- Python models cannot - [reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). -- Third-party Python libraries can be used, but they must be [included in the pre-installed list][pre-installed list] - or [imported manually][imported manually]. -- Python models can only reference or write to tables with names meeting the - regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not - supported by Spark, even though Athena supports them. -- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which - runs on Trino. -- Snapshot materializations are not supported. -- Spark can only reference tables within the same catalog. -- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error -when trying to create the table. - -[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html -[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html - -## Contracts - -The adapter partly supports contract definitions: - -- `data_type` is supported but needs to be adjusted for complex types. Types must be specified - entirely (for instance `array<int>`) even though they won't be checked. Indeed, as dbt recommends, we only compare - the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types - defined in Athena are ok (pre-flight check). -- The adapter does not support the constraints since there is no constraint concept in Athena. - -## Contributing - -See [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to this project. - -## Contributors ✨ - -Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): - -<a href="https://github.com/dbt-athena/dbt-athena/graphs/contributors"> - <img src="https://contrib.rocks/image?repo=dbt-athena/dbt-athena" /> -</a> - -Contributions of any kind welcome! +This repository as moved into the `dbt-labs/dbt-adapters` monorepo found +[here](https://www.github.com/dbt-labs/dbt-adapters). diff --git a/dbt-athena/CONTRIBUTING.md b/dbt-athena/CONTRIBUTING.md deleted file mode 100644 index e69de29b..00000000 diff --git a/dbt-athena/README.md b/dbt-athena/README.md index c5487c05..99e2ec02 100644 --- a/dbt-athena/README.md +++ b/dbt-athena/README.md @@ -21,862 +21,5 @@ </a> </p> -<!-- TOC --> -- [Features](#features) - - [Quick start](#quick-start) - - [Installation](#installation) - - [Prerequisites](#prerequisites) - - [Credentials](#credentials) - - [Configuring your profile](#configuring-your-profile) - - [Additional information](#additional-information) - - [Models](#models) - - [Table configuration](#table-configuration) - - [Table location](#table-location) - - [Incremental models](#incremental-models) - - [On schema change](#on-schema-change) - - [Iceberg](#iceberg) - - [Highly available table (HA)](#highly-available-table-ha) - - [HA known issues](#ha-known-issues) - - [Update glue data catalog](#update-glue-data-catalog) - - [Snapshots](#snapshots) - - [Timestamp strategy](#timestamp-strategy) - - [Check strategy](#check-strategy) - - [Hard-deletes](#hard-deletes) - - [Working example](#working-example) - - [Snapshots known issues](#snapshots-known-issues) - - [AWS Lake Formation integration](#aws-lake-formation-integration) - - [Python models](#python-models) - - [Contracts](#contracts) - - [Contributing](#contributing) - - [Contributors ✨](#contributors-) -<!-- TOC --> - -# Features - -- Supports dbt version `1.7.*` -- Support for Python -- Supports [seeds][seeds] -- Correctly detects views and their columns -- Supports [table materialization][table] - - [Iceberg tables][athena-iceberg] are supported **only with Athena Engine v3** and **a unique table location** - (see table location section below) - - Hive tables are supported by both Athena engines -- Supports [incremental models][incremental] - - On Iceberg tables: - - Supports the use of `unique_key` only with the `merge` strategy - - Supports the `append` strategy - - On Hive tables: - - Supports two incremental update strategies: `insert_overwrite` and `append` - - Does **not** support the use of `unique_key` -- Supports [snapshots][snapshots] -- Supports [Python models][python-models] - -[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds - -[incremental]: https://docs.getdbt.com/docs/build/incremental-models - -[table]: https://docs.getdbt.com/docs/build/materializations#table - -[python-models]: https://docs.getdbt.com/docs/build/python-models#configuring-python-models - -[athena-iceberg]: https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg.html - -[snapshots]: https://docs.getdbt.com/docs/build/snapshots - -## Quick start - -### Installation - -- `pip install dbt-athena-community` -- Or `pip install git+https://github.com/dbt-athena/dbt-athena.git` - -### Prerequisites - -To start, you will need an S3 bucket, for instance `my-bucket` and an Athena database: - -```sql -CREATE DATABASE IF NOT EXISTS analytics_dev -COMMENT 'Analytics models generated by dbt (development)' -LOCATION 's3://my-bucket/' -WITH DBPROPERTIES ('creator'='Foo Bar', 'email'='foo@bar.com'); -``` - -Notes: - -- Take note of your AWS region code (e.g. `us-west-2` or `eu-west-2`, etc.). -- You can also use [AWS Glue](https://docs.aws.amazon.com/athena/latest/ug/glue-athena.html) to create and manage Athena - databases. - -### Credentials - -Credentials can be passed directly to the adapter, or they can -be [determined automatically](https://boto3.amazonaws.com/v1/documentation/api/latest/guide/credentials.html) based -on `aws cli`/`boto3` conventions. -You can either: - -- Configure `aws_access_key_id` and `aws_secret_access_key` -- Configure `aws_profile_name` to match a profile defined in your AWS credentials file. - Checkout dbt profile configuration below for details. - -### Configuring your profile - -A dbt profile can be configured to run against AWS Athena using the following configuration: - -| Option | Description | Required? | Example | -|-----------------------|------------------------------------------------------------------------------------------|-----------|--------------------------------------------| -| s3_staging_dir | S3 location to store Athena query results and metadata | Required | `s3://bucket/dbt/` | -| s3_data_dir | Prefix for storing tables, if different from the connection's `s3_staging_dir` | Optional | `s3://bucket2/dbt/` | -| s3_data_naming | How to generate table paths in `s3_data_dir` | Optional | `schema_table_unique` | -| s3_tmp_table_dir | Prefix for storing temporary tables, if different from the connection's `s3_data_dir` | Optional | `s3://bucket3/dbt/` | -| region_name | AWS region of your Athena instance | Required | `eu-west-1` | -| schema | Specify the schema (Athena database) to build models into (lowercase **only**) | Required | `dbt` | -| database | Specify the database (Data catalog) to build models into (lowercase **only**) | Required | `awsdatacatalog` | -| poll_interval | Interval in seconds to use for polling the status of query results in Athena | Optional | `5` | -| debug_query_state | Flag if debug message with Athena query state is needed | Optional | `false` | -| aws_access_key_id | Access key ID of the user performing requests | Optional | `AKIAIOSFODNN7EXAMPLE` | -| aws_secret_access_key | Secret access key of the user performing requests | Optional | `wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY` | -| aws_profile_name | Profile to use from your AWS shared credentials file | Optional | `my-profile` | -| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` | -| skip_workgroup_check | Indicates if the WorkGroup check (additional AWS call) can be skipped | Optional | `true` | -| num_retries | Number of times to retry a failing query | Optional | `3` | -| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` | -| num_iceberg_retries | Number of times to retry iceberg commit queries to fix ICEBERG_COMMIT_ERROR | Optional | `3` | -| spark_work_group | Identifier of Athena Spark workgroup for running Python models | Optional | `my-spark-workgroup` | -| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` | -| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` | - -**Example profiles.yml entry:** - -```yaml -athena: - target: dev - outputs: - dev: - type: athena - s3_staging_dir: s3://athena-query-results/dbt/ - s3_data_dir: s3://your_s3_bucket/dbt/ - s3_data_naming: schema_table - s3_tmp_table_dir: s3://your_s3_bucket/temp/ - region_name: eu-west-1 - schema: dbt - database: awsdatacatalog - threads: 4 - aws_profile_name: my-profile - work_group: my-workgroup - spark_work_group: my-spark-workgroup - seed_s3_upload_args: - ACL: bucket-owner-full-control -``` - -### Additional information - -- `threads` is supported -- `database` and `catalog` can be used interchangeably - -## Models - -### Table configuration - -- `external_location` (`default=none`) - - If set, the full S3 path to which the table will be saved - - Works only with incremental models - - Does not work with Hive table with `ha` set to true -- `partitioned_by` (`default=none`) - - An array list of columns by which the table will be partitioned - - Limited to creation of 100 partitions (*currently*) -- `bucketed_by` (`default=none`) - - An array list of columns to bucket data, ignored if using Iceberg -- `bucket_count` (`default=none`) - - The number of buckets for bucketing your data, ignored if using Iceberg -- `table_type` (`default='hive'`) - - The type of table - - Supports `hive` or `iceberg` -- `ha` (`default=false`) - - If the table should be built using the high-availability method. This option is only available for Hive tables - since it is by default for Iceberg tables (see the section [below](#highly-available-table-ha)) -- `format` (`default='parquet'`) - - The data format for the table - - Supports `ORC`, `PARQUET`, `AVRO`, `JSON`, `TEXTFILE` -- `write_compression` (`default=none`) - - The compression type to use for any storage format that allows compression to be specified. To see which options are - available, check out [CREATE TABLE AS][create-table-as] -- `field_delimiter` (`default=none`) - - Custom field delimiter, for when format is set to `TEXTFILE` -- `table_properties`: table properties to add to the table, valid for Iceberg only -- `native_drop`: Relation drop operations will be performed with SQL, not direct Glue API calls. No S3 calls will be - made to manage data in S3. Data in S3 will only be cleared up for Iceberg - tables [see AWS docs](https://docs.aws.amazon.com/athena/latest/ug/querying-iceberg-managing-tables.html). Note that - Iceberg DROP TABLE operations may timeout if they take longer than 60 seconds. -- `seed_by_insert` (`default=false`) - - Default behaviour uploads seed data to S3. This flag will create seeds using an SQL insert statement - - Large seed files cannot use `seed_by_insert`, as the SQL insert statement would - exceed [the Athena limit of 262144 bytes](https://docs.aws.amazon.com/athena/latest/ug/service-limits.html) -- `force_batch` (`default=false`) - - Skip creating the table as CTAS and run the operation directly in batch insert mode - - This is particularly useful when the standard table creation process fails due to partition limitations, - allowing you to work with temporary tables and persist the dataset more efficiently -- `unique_tmp_table_suffix` (`default=false`) - - For incremental models using insert overwrite strategy on hive table - - Replace the __dbt_tmp suffix used as temporary table name suffix by a unique uuid - - Useful if you are looking to run multiple dbt build inserting in the same table in parallel -- `temp_schema` (`default=none`) - - For incremental models, it allows to define a schema to hold temporary create statements - used in incremental model runs - - Schema will be created in the model target database if does not exist -- `lf_tags_config` (`default=none`) - - [AWS Lake Formation](#aws-lake-formation-integration) tags to associate with the table and columns - - `enabled` (`default=False`) whether LF tags management is enabled for a model - - `tags` dictionary with tags and their values to assign for the model - - `tags_columns` dictionary with a tag key, value and list of columns they must be assigned to - - `lf_inherited_tags` (`default=none`) - - List of Lake Formation tag keys that are intended to be inherited from the database level and thus shouldn't be - removed during association of those defined in `lf_tags_config` - - i.e., the default behavior of `lf_tags_config` is to be exhaustive and first remove any pre-existing tags from - tables and columns before associating the ones currently defined for a given model - - This breaks tag inheritance as inherited tags appear on tables and columns like those associated directly - -```sql -{{ - config( - materialized='incremental', - incremental_strategy='append', - on_schema_change='append_new_columns', - table_type='iceberg', - schema='test_schema', - lf_tags_config={ - 'enabled': true, - 'tags': { - 'tag1': 'value1', - 'tag2': 'value2' - }, - 'tags_columns': { - 'tag1': { - 'value1': ['column1', 'column2'], - 'value2': ['column3', 'column4'] - } - }, - 'inherited_tags': ['tag1', 'tag2'] - } - ) -}} -``` - -- Format for `dbt_project.yml`: - -```yaml - +lf_tags_config: - enabled: true - tags: - tag1: value1 - tag2: value2 - tags_columns: - tag1: - value1: [ column1, column2 ] - inherited_tags: [ tag1, tag2 ] -``` - -- `lf_grants` (`default=none`) - - Lake Formation grants config for data_cell filters - - Format: - - ```python - lf_grants={ - 'data_cell_filters': { - 'enabled': True | False, - 'filters': { - 'filter_name': { - 'row_filter': '<filter_condition>', - 'principals': ['principal_arn1', 'principal_arn2'] - } - } - } - } - ``` - -> Notes: -> -> - `lf_tags` and `lf_tags_columns` configs support only attaching lf tags to corresponding resources. -> We recommend managing LF Tags permissions somewhere outside dbt. For example, you may use -> [terraform](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) or -> [aws cdk](https://docs.aws.amazon.com/cdk/api/v1/docs/aws-lakeformation-readme.html) for such purpose. -> - `data_cell_filters` management can't be automated outside dbt because the filter can't be attached to the table -> which doesn't exist. Once you `enable` this config, dbt will set all filters and their permissions during every -> dbt run. Such approach keeps the actual state of row level security configuration actual after every dbt run and -> apply changes if they occur: drop, create, update filters and their permissions. -> - Any tags listed in `lf_inherited_tags` should be strictly inherited from the database level and never overridden at - the table and column level -> - Currently `dbt-athena` does not differentiate between an inherited tag association and an override of same it made -> previously -> - e.g. If an inherited tag is overridden by an `lf_tags_config` value in one DBT run, and that override is removed - prior to a subsequent run, the prior override will linger and no longer be encoded anywhere (in e.g. Terraform - where the inherited value is configured nor in the DBT project where the override previously existed but now is - gone) - -[create-table-as]: https://docs.aws.amazon.com/athena/latest/ug/create-table-as.html#ctas-table-properties - -### Table location - -The location a table is saved to is determined by: - -1. If `external_location` is defined, that value is used -2. If `s3_data_dir` is defined, the path is determined by that and `s3_data_naming` -3. If `s3_data_dir` is not defined, data is stored under `s3_staging_dir/tables/` - -Here all the options available for `s3_data_naming`: - -- `unique`: `{s3_data_dir}/{uuid4()}/` -- `table`: `{s3_data_dir}/{table}/` -- `table_unique`: `{s3_data_dir}/{table}/{uuid4()}/` -- `schema_table`: `{s3_data_dir}/{schema}/{table}/` -- `s3_data_naming=schema_table_unique`: `{s3_data_dir}/{schema}/{table}/{uuid4()}/` - -It's possible to set the `s3_data_naming` globally in the target profile, or overwrite the value in the table config, -or setting up the value for groups of model in dbt_project.yml. - -> Note: when using a workgroup with a default output location configured, `s3_data_naming` and any configured buckets -> are ignored and the location configured in the workgroup is used. - -### Incremental models - -Support for [incremental models](https://docs.getdbt.com/docs/build/incremental-models). - -These strategies are supported: - -- `insert_overwrite` (default): The insert overwrite strategy deletes the overlapping partitions from the destination - table, and then inserts the new records from the source. This strategy depends on the `partitioned_by` keyword! If no - partitions are defined, dbt will fall back to the `append` strategy. -- `append`: Insert new records without updating, deleting or overwriting any existing data. There might be duplicate - data (e.g. great for log or historical data). -- `merge`: Conditionally updates, deletes, or inserts rows into an Iceberg table. Used in combination with `unique_key`. - Only available when using Iceberg. - -### On schema change - -`on_schema_change` is an option to reflect changes of schema in incremental models. -The following options are supported: - -- `ignore` (default) -- `fail` -- `append_new_columns` -- `sync_all_columns` - -For details, please refer -to [dbt docs](https://docs.getdbt.com/docs/build/incremental-models#what-if-the-columns-of-my-incremental-model-change). - -### Iceberg - -The adapter supports table materialization for Iceberg. - -To get started just add this as your model: - -```sql -{{ config( - materialized='table', - table_type='iceberg', - format='parquet', - partitioned_by=['bucket(user_id, 5)'], - table_properties={ - 'optimize_rewrite_delete_file_threshold': '2' - } -) }} - -select 'A' as user_id, - 'pi' as name, - 'active' as status, - 17.89 as cost, - 1 as quantity, - 100000000 as quantity_big, - current_date as my_date -``` - -Iceberg supports bucketing as hidden partitions, therefore use the `partitioned_by` config to add specific bucketing -conditions. - -Iceberg supports several table formats for data : `PARQUET`, `AVRO` and `ORC`. - -It is possible to use Iceberg in an incremental fashion, specifically two strategies are supported: - -- `append`: New records are appended to the table, this can lead to duplicates. -- `merge`: Performs an upsert (and optional delete), where new records are added and existing records are updated. Only - available with Athena engine version 3. - - `unique_key` **(required)**: columns that define a unique record in the source and target tables. - - `incremental_predicates` (optional): SQL conditions that enable custom join clauses in the merge statement. This can - be useful for improving performance via predicate pushdown on the target table. - - `delete_condition` (optional): SQL condition used to identify records that should be deleted. - - `update_condition` (optional): SQL condition used to identify records that should be updated. - - `insert_condition` (optional): SQL condition used to identify records that should be inserted. - - `incremental_predicates`, `delete_condition`, `update_condition` and `insert_condition` can include any column of - the incremental table (`src`) or the final table (`target`). - Column names must be prefixed by either `src` or `target` to prevent a `Column is ambiguous` error. - -`delete_condition` example: - -```sql -{{ config( - materialized='incremental', - table_type='iceberg', - incremental_strategy='merge', - unique_key='user_id', - incremental_predicates=["src.quantity > 1", "target.my_date >= now() - interval '4' year"], - delete_condition="src.status != 'active' and target.my_date < now() - interval '2' year", - format='parquet' -) }} - -select 'A' as user_id, - 'pi' as name, - 'active' as status, - 17.89 as cost, - 1 as quantity, - 100000000 as quantity_big, - current_date as my_date -``` - -`update_condition` example: - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - unique_key=['id'], - update_condition='target.id > 1', - schema='sandbox' - ) -}} - -{% if is_incremental() %} - -select * from ( - values - (1, 'v1-updated') - , (2, 'v2-updated') -) as t (id, value) - -{% else %} - -select * from ( - values - (-1, 'v-1') - , (0, 'v0') - , (1, 'v1') - , (2, 'v2') -) as t (id, value) - -{% endif %} -``` - -`insert_condition` example: - -```sql -{{ config( - materialized='incremental', - incremental_strategy='merge', - unique_key=['id'], - insert_condition='target.status != 0', - schema='sandbox' - ) -}} - -select * from ( - values - (1, 0) - , (2, 1) -) as t (id, status) - -``` - -### Highly available table (HA) - -The current implementation of the table materialization can lead to downtime, as the target table is -dropped and re-created. To have the less destructive behavior it's possible to use the `ha` config on -your `table` materialized models. It leverages the table versions feature of glue catalog, creating -a temp table and swapping the target table to the location of the temp table. This materialization -is only available for `table_type=hive` and requires using unique locations. For iceberg, high -availability is the default. - -```sql -{{ config( - materialized='table', - ha=true, - format='parquet', - table_type='hive', - partitioned_by=['status'], - s3_data_naming='table_unique' -) }} - -select 'a' as user_id, - 'pi' as user_name, - 'active' as status -union all -select 'b' as user_id, - 'sh' as user_name, - 'disabled' as status -``` - -By default, the materialization keeps the last 4 table versions, you can change it by setting `versions_to_keep`. - -#### HA known issues - -- When swapping from a table with partitions to a table without (and the other way around), there could be a little - downtime. - If high performances is needed consider bucketing instead of partitions -- By default, Glue "duplicates" the versions internally, so the last two versions of a table point to the same location -- It's recommended to set `versions_to_keep` >= 4, as this will avoid having the older location removed - -### Update glue data catalog - -Optionally persist resource descriptions as column and relation comments to the glue data catalog, and meta as -[glue table properties](https://docs.aws.amazon.com/glue/latest/dg/tables-described.html#table-properties) -and [column parameters](https://docs.aws.amazon.com/glue/latest/webapi/API_Column.html). -By default, documentation persistence is disabled, but it can be enabled for specific resources or -groups of resources as needed. - -For example: - -```yaml -models: - - name: test_deduplicate - description: another value - config: - persist_docs: - relation: true - columns: true - meta: - test: value - columns: - - name: id - meta: - primary_key: true -``` - -See [persist docs](https://docs.getdbt.com/reference/resource-configs/persist_docs) for more details. - -## Snapshots - -The adapter supports snapshot materialization. It supports both timestamp and check strategy. To create a snapshot -create a snapshot file in the snapshots directory. If the directory does not exist create one. - -### Timestamp strategy - -To use the timestamp strategy refer to -the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#timestamp-strategy-recommended) - -### Check strategy - -To use the check strategy refer to the [dbt docs](https://docs.getdbt.com/docs/build/snapshots#check-strategy) - -### Hard-deletes - -The materialization also supports invalidating hard deletes. Check -the [docs](https://docs.getdbt.com/docs/build/snapshots#hard-deletes-opt-in) to understand usage. - -### Working example - -seed file - employent_indicators_november_2022_csv_tables.csv - -```csv -Series_reference,Period,Data_value,Suppressed -MEIM.S1WA,1999.04,80267, -MEIM.S1WA,1999.05,70803, -MEIM.S1WA,1999.06,65792, -MEIM.S1WA,1999.07,66194, -MEIM.S1WA,1999.08,67259, -MEIM.S1WA,1999.09,69691, -MEIM.S1WA,1999.1,72475, -MEIM.S1WA,1999.11,79263, -MEIM.S1WA,1999.12,86540, -MEIM.S1WA,2000.01,82552, -MEIM.S1WA,2000.02,81709, -MEIM.S1WA,2000.03,84126, -MEIM.S1WA,2000.04,77089, -MEIM.S1WA,2000.05,73811, -MEIM.S1WA,2000.06,70070, -MEIM.S1WA,2000.07,69873, -MEIM.S1WA,2000.08,71468, -MEIM.S1WA,2000.09,72462, -MEIM.S1WA,2000.1,74897, -``` - -model.sql - -```sql -{{ config( - materialized='table' -) }} - -select row_number() over() as id - , * - , cast(from_unixtime(to_unixtime(now())) as timestamp(6)) as refresh_timestamp -from {{ ref('employment_indicators_november_2022_csv_tables') }} -``` - -timestamp strategy - model_snapshot_1 - -```sql -{% snapshot model_snapshot_1 %} - -{{ - config( - strategy='timestamp', - updated_at='refresh_timestamp', - unique_key='id' - ) -}} - -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -invalidate hard deletes - model_snapshot_2 - -```sql -{% snapshot model_snapshot_2 %} - -{{ - config - ( - unique_key='id', - strategy='timestamp', - updated_at='refresh_timestamp', - invalidate_hard_deletes=True, - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -check strategy - model_snapshot_3 - -```sql -{% snapshot model_snapshot_3 %} - -{{ - config - ( - unique_key='id', - strategy='check', - check_cols=['series_reference','data_value'] - ) -}} -select * -from {{ ref('model') }} {% endsnapshot %} -``` - -### Snapshots known issues - -- Incremental Iceberg models - Sync all columns on schema change can't remove columns used for partitioning. - The only way, from a dbt perspective, is to do a full-refresh of the incremental model. - -- Tables, schemas and database names should only be lowercase - -- In order to avoid potential conflicts, make sure [`dbt-athena-adapter`](https://github.com/Tomme/dbt-athena) is not - installed in the target environment. - See <https://github.com/dbt-athena/dbt-athena/issues/103> for more details. - -- Snapshot does not support dropping columns from the source table. If you drop a column make sure to drop the column - from the snapshot as well. Another workaround is to NULL the column in the snapshot definition to preserve history - -## AWS Lake Formation integration - -The adapter implements AWS Lake Formation tags management in the following way: - -- You can enable or disable lf-tags management via [config](#table-configuration) (disabled by default) -- Once you enable the feature, lf-tags will be updated on every dbt run -- First, all lf-tags for columns are removed to avoid inheritance issues -- Then, all redundant lf-tags are removed from tables and actual tags from table configs are applied -- Finally, lf-tags for columns are applied - -It's important to understand the following points: - -- dbt does not manage lf-tags for databases -- dbt does not manage Lake Formation permissions - -That's why you should handle this by yourself manually or using an automation tool like terraform, AWS CDK etc. -You may find the following links useful to manage that: - -<!-- markdownlint-disable --> -* [terraform aws_lakeformation_permissions](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_permissions) -* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags) -<!-- markdownlint-restore --> - -## Python models - -The adapter supports Python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). - -### Setup - -- A Spark-enabled workgroup created in Athena -- Spark execution role granted access to Athena, Glue and S3 -- The Spark workgroup is added to the `~/.dbt/profiles.yml` file and the profile to be used - is referenced in `dbt_project.yml` - -### Spark-specific table configuration - -- `timeout` (`default=43200`) - - Time out in seconds for each Python model execution. Defaults to 12 hours/43200 seconds. -- `spark_encryption` (`default=false`) - - If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored - locally by Spark. -- `spark_cross_account_catalog` (`default=false`) - - When using the Spark Athena workgroup, queries can only be made against catalogs located on the same - AWS account by default. However, sometimes you want to query another catalog located on an external AWS - account. Setting this additional Spark properties parameter to true will enable querying external catalogs. - You can use the syntax `external_catalog_id/database.table` to access the external table on the external - catalog (ex: `999999999999/mydatabase.cloudfront_logs` where 999999999999 is the external catalog ID) -- `spark_requester_pays` (`default=false`) - - When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for - data access and data transfer fees associated with the query. - - If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark. - -### Spark notes - -- A session is created for each unique engine configuration defined in the models that are part of the invocation. -- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation - (Spark Python model) ready for execution and the engine configuration matches, the process will reuse the same session. -- The number of Python models running at a time depends on the `threads`. The number of sessions created for the - entire run depends on the number of unique engine configurations and the availability of sessions to maintain - thread concurrency. -- For Iceberg tables, it is recommended to use `table_properties` configuration to set the `format_version` to 2. - This is to maintain compatibility between Iceberg tables created by Trino with those created by Spark. - -### Example models - -#### Simple pandas model - -```python -import pandas as pd - - -def model(dbt, session): - dbt.config(materialized="table") - - model_df = pd.DataFrame({"A": [1, 2, 3, 4]}) - - return model_df -``` - -#### Simple spark - -```python -def model(dbt, spark_session): - dbt.config(materialized="table") - - data = [(1,), (2,), (3,), (4,)] - - df = spark_session.createDataFrame(data, ["A"]) - - return df -``` - -#### Spark incremental - -```python -def model(dbt, spark_session): - dbt.config(materialized="incremental") - df = dbt.ref("model") - - if dbt.is_incremental: - max_from_this = ( - f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}" - ) - df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0]) - - return df -``` - -#### Config spark model - -```python -def model(dbt, spark_session): - dbt.config( - materialized="table", - engine_config={ - "CoordinatorDpuSize": 1, - "MaxConcurrentDpus": 3, - "DefaultExecutorDpuSize": 1 - }, - spark_encryption=True, - spark_cross_account_catalog=True, - spark_requester_pays=True - polling_interval=15, - timeout=120, - ) - - data = [(1,), (2,), (3,), (4,)] - - df = spark_session.createDataFrame(data, ["A"]) - - return df -``` - -#### Create pySpark udf using imported external python files - -```python -def model(dbt, spark_session): - dbt.config( - materialized="incremental", - incremental_strategy="merge", - unique_key="num", - ) - sc = spark_session.sparkContext - sc.addPyFile("s3://athena-dbt/test/file1.py") - sc.addPyFile("s3://athena-dbt/test/file2.py") - - def func(iterator): - from file2 import transform - - return [transform(i) for i in iterator] - - from pyspark.sql.functions import udf - from pyspark.sql.functions import col - - udf_with_import = udf(func) - - data = [(1, "a"), (2, "b"), (3, "c")] - cols = ["num", "alpha"] - df = spark_session.createDataFrame(data, cols) - - return df.withColumn("udf_test_col", udf_with_import(col("alpha"))) -``` - -### Known issues in Python models - -- Python models cannot - [reference Athena SQL views](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html). -- Third-party Python libraries can be used, but they must be [included in the pre-installed list][pre-installed list] - or [imported manually][imported manually]. -- Python models can only reference or write to tables with names meeting the - regular expression: `^[0-9a-zA-Z_]+$`. Dashes and special characters are not - supported by Spark, even though Athena supports them. -- Incremental models do not fully utilize Spark capabilities. They depend partially on existing SQL-based logic which - runs on Trino. -- Snapshot materializations are not supported. -- Spark can only reference tables within the same catalog. -- For tables created outside of the dbt tool, be sure to populate the location field or dbt will throw an error -when trying to create the table. - -[pre-installed list]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark-preinstalled-python-libraries.html -[imported manually]: https://docs.aws.amazon.com/athena/latest/ug/notebooks-import-files-libraries.html - -## Contracts - -The adapter partly supports contract definitions: - -- `data_type` is supported but needs to be adjusted for complex types. Types must be specified - entirely (for instance `array<int>`) even though they won't be checked. Indeed, as dbt recommends, we only compare - the broader type (array, map, int, varchar). The complete definition is used in order to check that the data types - defined in Athena are ok (pre-flight check). -- The adapter does not support the constraints since there is no constraint concept in Athena. - -## Contributing - -See [CONTRIBUTING](CONTRIBUTING.md) for more information on how to contribute to this project. - -## Contributors ✨ - -Thanks goes to these wonderful people ([emoji key](https://allcontributors.org/docs/en/emoji-key)): - -<a href="https://github.com/dbt-athena/dbt-athena/graphs/contributors"> - <img src="https://contrib.rocks/image?repo=dbt-athena/dbt-athena" /> -</a> - -Contributions of any kind welcome! +This repository as moved into the `dbt-labs/dbt-adapters` monorepo found +[here](https://www.github.com/dbt-labs/dbt-adapters).