Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add primary key constraint for new models #104

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 9 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,13 @@ Below is a table for what features the current Vertica adapter supports for dbt.
* **Passes Test** -The testes have passed, though haven't tested in a production like environment
### Vertica Features
Below is a table for what features the current Vertica adapter supports for Vertica. This is constantly improving and changing as both dbt adds new functionality, as well as the dbt-vertica driver improves.
| Vertica Features | Supported |
| Vertica Features | Supported |
| --------------------- | --------- |
| Created/Drop Schema | Yes |
| Analyze Statistics | No |
| Purge Delete Vectors | No |
| Projection Management | No |
| Primary/Unique Keys | No |
| Primary/Unique Keys | Primary key only |
| Other DDLs | No |

## Installation
Expand All @@ -54,14 +54,14 @@ your-profile:
type: vertica # Don't change this!
host: [hostname]
port: [port] # or your custom port (optional)
username: [your username]
password: [your password]
database: [database name]
schema: [dbt schema]
username: [your username]
password: [your password]
database: [database name]
schema: [dbt schema]
connection_load_balance: True
backup_server_node: [list of backup hostnames or IPs]
retries: [1 or more]
threads: [1 or more]
threads: [1 or more]
target: dev

```
Expand Down Expand Up @@ -117,9 +117,7 @@ You need the pytest dbt adapter:
pip3 install dbt-tests-adapter==1.3.0

Run tests via:

pytest tests/functional/adapter/
# run an individual test
# run an individual test
pytest tests/functional/adapter/test_basic.py


52 changes: 45 additions & 7 deletions dbt/adapters/vertica/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class verticaCredentials(Credentials):
# backup_server_node: Optional[str] = None

# additional_info = {
# 'password': str,
# 'password': str,
# 'backup_server_node': list# invalid value to be set in a connection string
# }

Expand Down Expand Up @@ -97,9 +97,9 @@ def open(cls, connection):
'connection_load_balance':credentials.connection_load_balance,
'session_label': f'dbt_{credentials.username}',
'retries': credentials.retries,

'backup_server_node':credentials.backup_server_node,

}

# if credentials.ssl.lower() in {'true', 'yes', 'please'}:
Expand All @@ -119,16 +119,16 @@ def open(cls, connection):
context = ssl.create_default_context()
conn_info['ssl'] = context
logger.debug(f'SSL is on')

def connect():
handle = vertica_python.connect(**conn_info)
logger.debug(f':P Connection work {handle}')
connection.state = 'open'
connection.handle = handle
logger.debug(f':P Connected to database: {credentials.database} at {credentials.host} at {handle}')
return handle




except Exception as exc:
Expand Down Expand Up @@ -184,6 +184,45 @@ def cancel(self, connection):
logger.debug(':P Cancel query')
connection.handle.cancel()

@classmethod
def get_result_from_cursor(cls, cursor: Any) -> agate.Table:
data: List[Any] = []
column_names: List[str] = []

if cursor.description is not None:
column_names = [col[0] for col in cursor.description]
rows = cursor.fetchall()

# check result for every query if there are some queries with ; separator
while cursor.nextset():
check = cursor._message
if isinstance(check, ErrorResponse):
logger.debug(f'Cursor message is: {check}')
self.release()
raise dbt.exceptions.DatabaseException(str(check))

data = cls.process_results(column_names, rows)

return dbt.clients.agate_helper.table_from_data_flat(data, column_names)

def execute(
self, sql: str, auto_begin: bool = False, fetch: bool = False
) -> Tuple[AdapterResponse, agate.Table]:
sql = self._add_query_comment(sql)
_, cursor = self.add_query(sql, auto_begin)
response = self.get_response(cursor)
if fetch:
table = self.get_result_from_cursor(cursor)
else:
table = dbt.clients.agate_helper.empty_table()
while cursor.nextset():
check = cursor._message
if isinstance(check, vertica_python.vertica.messages.ErrorResponse):
logger.debug(f'Cursor message is: {check}')
self.release()
raise dbt.exceptions.DatabaseException(str(check))
return response, table


@contextmanager
def exception_handler(self, sql):
Expand All @@ -197,4 +236,3 @@ def exception_handler(self, sql):
logger.debug(f':P Error: {exc}')
self.release()
raise dbt.exceptions.RuntimeException(str(exc))

16 changes: 12 additions & 4 deletions dbt/include/vertica/macros/adapters/freshness.sql
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,15 @@
{%- endmacro %}


{% macro vertica__collect_freshness() -%}
{{ exceptions.raise_not_implemented(
'collect_freshness macro not implemented for adapter '+adapter.type()) }}
{%- endmacro %}
{% macro vertica__collect_freshness(source, loaded_at_field, filter) -%}
{% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%}
select
max({{ loaded_at_field }}) as max_loaded_at,
{{ current_timestamp() }} as snapshotted_at
from {{ source }}
{% if filter %}
where {{ filter }}
{% endif %}
{% endcall %}
{{ return(load_result('collect_freshness').table) }}
{%- endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -9,39 +9,46 @@
{%- set partition_by_string = config.get('partition_by_string', default=none) -%}
{%- set partition_by_group_by_string = config.get('partition_by_group_by_string', default=none) -%}
{%- set partition_by_active_count = config.get('partition_by_active_count', default=none) -%}

{%- set primary_key_columns_by_string = config.get('primary_key_columns_by_string', default=none) -%}

create {% if temporary: -%}local temporary{%- endif %} table
{{ relation.include(database=(not temporary), schema=(not temporary)) }}
{% if temporary: -%}on commit preserve rows{%- endif %}
INCLUDE SCHEMA PRIVILEGES as (
{{ sql }}
)

{% if order_by is not none -%}
order by {{ order_by }}
{% endif -%}
{% if not temporary: %}
{% if order_by is not none -%}
order by {{ order_by }}
{% endif -%}

{% if segmented_by_string is not none -%}
segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %}
{% endif %}
{% if segmented_by_string is not none -%}
segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %}
{% endif %}

{% if no_segmentation =='True' or no_segmentation=='true' -%}
UNSEGMENTED ALL NODES
{% endif -%}
{% if no_segmentation =='True' or no_segmentation=='true' -%}
UNSEGMENTED ALL NODES
{% endif -%}

{% if ksafe is not none -%}
ksafe {{ ksafe }}
{% endif -%}

{% if partition_by_string is not none -%}
; alter table {{ relation.include(database=(not temporary), schema=(not temporary)) }} partition BY {{ partition_by_string }}
{% if partition_by_string is not none and partition_by_group_by_string is not none -%}
group by {{ partition_by_group_by_string }}
{% endif %}
{% if partition_by_string is not none and partition_by_active_count is not none %}
SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }}
{% if ksafe is not none -%}
ksafe {{ ksafe }}
{% endif -%}

{% if partition_by_string is not none -%}
; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} PARTITION BY {{ partition_by_string }}
{% if partition_by_group_by_string is not none -%}
GROUP BY {{ partition_by_group_by_string }}
{% endif %}
{% if partition_by_active_count is not none %}
SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }}
{% endif %}
; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} REORGANIZE;
{% endif %}
{% endif %}

{% if primary_key_columns_by_string is not none -%}
; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} ADD CONSTRAINT pk PRIMARY KEY ({{ primary_key_columns_by_string }}) ENABLED
{% endif -%}
{% endif %}
;
{% endmacro %}