diff --git a/README.md b/README.md index b770109..d834f9f 100644 --- a/README.md +++ b/README.md @@ -31,13 +31,13 @@ Below is a table for what features the current Vertica adapter supports for dbt. * **Passes Test** -The testes have passed, though haven't tested in a production like environment ### Vertica Features Below is a table for what features the current Vertica adapter supports for Vertica. This is constantly improving and changing as both dbt adds new functionality, as well as the dbt-vertica driver improves. -| Vertica Features | Supported | +| Vertica Features | Supported | | --------------------- | --------- | | Created/Drop Schema | Yes | | Analyze Statistics | No | | Purge Delete Vectors | No | | Projection Management | No | -| Primary/Unique Keys | No | +| Primary/Unique Keys | Primary key only | | Other DDLs | No | ## Installation @@ -54,14 +54,14 @@ your-profile: type: vertica # Don't change this! host: [hostname] port: [port] # or your custom port (optional) - username: [your username] - password: [your password] - database: [database name] - schema: [dbt schema] + username: [your username] + password: [your password] + database: [database name] + schema: [dbt schema] connection_load_balance: True backup_server_node: [list of backup hostnames or IPs] retries: [1 or more] - threads: [1 or more] + threads: [1 or more] target: dev ``` @@ -117,9 +117,7 @@ You need the pytest dbt adapter: pip3 install dbt-tests-adapter==1.3.0 Run tests via: - + pytest tests/functional/adapter/ - # run an individual test + # run an individual test pytest tests/functional/adapter/test_basic.py - - diff --git a/dbt/adapters/vertica/connections.py b/dbt/adapters/vertica/connections.py index 5c1a916..94513a6 100644 --- a/dbt/adapters/vertica/connections.py +++ b/dbt/adapters/vertica/connections.py @@ -55,7 +55,7 @@ class verticaCredentials(Credentials): # backup_server_node: Optional[str] = None # additional_info = { - # 'password': str, + # 'password': str, # 'backup_server_node': list# invalid value to be set in a connection string # } @@ -97,9 +97,9 @@ def open(cls, connection): 'connection_load_balance':credentials.connection_load_balance, 'session_label': f'dbt_{credentials.username}', 'retries': credentials.retries, - + 'backup_server_node':credentials.backup_server_node, - + } # if credentials.ssl.lower() in {'true', 'yes', 'please'}: @@ -119,7 +119,7 @@ def open(cls, connection): context = ssl.create_default_context() conn_info['ssl'] = context logger.debug(f'SSL is on') - + def connect(): handle = vertica_python.connect(**conn_info) logger.debug(f':P Connection work {handle}') @@ -127,8 +127,8 @@ def connect(): connection.handle = handle logger.debug(f':P Connected to database: {credentials.database} at {credentials.host} at {handle}') return handle - - + + except Exception as exc: @@ -184,6 +184,45 @@ def cancel(self, connection): logger.debug(':P Cancel query') connection.handle.cancel() + @classmethod + def get_result_from_cursor(cls, cursor: Any) -> agate.Table: + data: List[Any] = [] + column_names: List[str] = [] + + if cursor.description is not None: + column_names = [col[0] for col in cursor.description] + rows = cursor.fetchall() + + # check result for every query if there are some queries with ; separator + while cursor.nextset(): + check = cursor._message + if isinstance(check, ErrorResponse): + logger.debug(f'Cursor message is: {check}') + self.release() + raise dbt.exceptions.DatabaseException(str(check)) + + data = cls.process_results(column_names, rows) + + return dbt.clients.agate_helper.table_from_data_flat(data, column_names) + + def execute( + self, sql: str, auto_begin: bool = False, fetch: bool = False + ) -> Tuple[AdapterResponse, agate.Table]: + sql = self._add_query_comment(sql) + _, cursor = self.add_query(sql, auto_begin) + response = self.get_response(cursor) + if fetch: + table = self.get_result_from_cursor(cursor) + else: + table = dbt.clients.agate_helper.empty_table() + while cursor.nextset(): + check = cursor._message + if isinstance(check, vertica_python.vertica.messages.ErrorResponse): + logger.debug(f'Cursor message is: {check}') + self.release() + raise dbt.exceptions.DatabaseException(str(check)) + return response, table + @contextmanager def exception_handler(self, sql): @@ -197,4 +236,3 @@ def exception_handler(self, sql): logger.debug(f':P Error: {exc}') self.release() raise dbt.exceptions.RuntimeException(str(exc)) - diff --git a/dbt/include/vertica/macros/adapters/freshness.sql b/dbt/include/vertica/macros/adapters/freshness.sql index 0f90672..e145219 100644 --- a/dbt/include/vertica/macros/adapters/freshness.sql +++ b/dbt/include/vertica/macros/adapters/freshness.sql @@ -3,7 +3,15 @@ {%- endmacro %} -{% macro vertica__collect_freshness() -%} - {{ exceptions.raise_not_implemented( - 'collect_freshness macro not implemented for adapter '+adapter.type()) }} -{%- endmacro %} \ No newline at end of file +{% macro vertica__collect_freshness(source, loaded_at_field, filter) -%} + {% call statement('collect_freshness', fetch_result=True, auto_begin=False) -%} + select + max({{ loaded_at_field }}) as max_loaded_at, + {{ current_timestamp() }} as snapshotted_at + from {{ source }} + {% if filter %} + where {{ filter }} + {% endif %} + {% endcall %} + {{ return(load_result('collect_freshness').table) }} +{%- endmacro %} diff --git a/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql index a2567ec..a45b64b 100644 --- a/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql +++ b/dbt/include/vertica/macros/materializations/models/table/create_table_as.sql @@ -9,7 +9,8 @@ {%- set partition_by_string = config.get('partition_by_string', default=none) -%} {%- set partition_by_group_by_string = config.get('partition_by_group_by_string', default=none) -%} {%- set partition_by_active_count = config.get('partition_by_active_count', default=none) -%} - + {%- set primary_key_columns_by_string = config.get('primary_key_columns_by_string', default=none) -%} + create {% if temporary: -%}local temporary{%- endif %} table {{ relation.include(database=(not temporary), schema=(not temporary)) }} {% if temporary: -%}on commit preserve rows{%- endif %} @@ -17,31 +18,37 @@ {{ sql }} ) - {% if order_by is not none -%} - order by {{ order_by }} - {% endif -%} + {% if not temporary: %} + {% if order_by is not none -%} + order by {{ order_by }} + {% endif -%} - {% if segmented_by_string is not none -%} - segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %} - {% endif %} + {% if segmented_by_string is not none -%} + segmented BY {{ segmented_by_string }} {% if segmented_by_all_nodes %} ALL NODES {% endif %} + {% endif %} - {% if no_segmentation =='True' or no_segmentation=='true' -%} - UNSEGMENTED ALL NODES - {% endif -%} + {% if no_segmentation =='True' or no_segmentation=='true' -%} + UNSEGMENTED ALL NODES + {% endif -%} - {% if ksafe is not none -%} - ksafe {{ ksafe }} - {% endif -%} - - {% if partition_by_string is not none -%} - ; alter table {{ relation.include(database=(not temporary), schema=(not temporary)) }} partition BY {{ partition_by_string }} - {% if partition_by_string is not none and partition_by_group_by_string is not none -%} - group by {{ partition_by_group_by_string }} - {% endif %} - {% if partition_by_string is not none and partition_by_active_count is not none %} - SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }} + {% if ksafe is not none -%} + ksafe {{ ksafe }} + {% endif -%} + + {% if partition_by_string is not none -%} + ; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} PARTITION BY {{ partition_by_string }} + {% if partition_by_group_by_string is not none -%} + GROUP BY {{ partition_by_group_by_string }} + {% endif %} + {% if partition_by_active_count is not none %} + SET ACTIVEPARTITIONCOUNT {{ partition_by_active_count }} + {% endif %} + ; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} REORGANIZE; {% endif %} - {% endif %} + + {% if primary_key_columns_by_string is not none -%} + ; ALTER TABLE {{ relation.include(database=(not temporary), schema=(not temporary)) }} ADD CONSTRAINT pk PRIMARY KEY ({{ primary_key_columns_by_string }}) ENABLED + {% endif -%} + {% endif %} ; {% endmacro %} -