From 98f7e75715ab4bc68524ce79c44565f681295f5c Mon Sep 17 00:00:00 2001 From: tbog Date: Sun, 11 Jun 2023 17:02:21 +0700 Subject: [PATCH 1/4] Feat: add logs for submitted jobs to bigquery --- dbt/adapters/bigquery/connections.py | 43 +++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index c9c6055ac..21d863738 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -35,6 +35,7 @@ from dbt.events import AdapterLogger from dbt.events.functions import fire_event from dbt.events.types import SQLQuery +from dbt.events.contextvars import get_node_info from dbt.version import __version__ as dbt_version from dbt.dataclass_schema import ExtensibleDbtClassMixin, StrEnum @@ -454,17 +455,33 @@ def raw_execute(self, sql, use_legacy_sql=False, limit: Optional[int] = None): job_creation_timeout = self.get_job_creation_timeout_seconds(conn) job_execution_timeout = self.get_job_execution_timeout_seconds(conn) + def reopen_conn_on_error(error): + if isinstance(error, REOPENABLE_ERRORS): + logger.warning("Reopening connection after {!r}".format(error)) + self.close(conn) + self.open(conn) + return + + error_counter = _ErrorCounter(self.get_job_retries(conn)) + + @retry.Retry( + predicate=error_counter.count_error, + sleep_generator=self._retry_generator(), + deadline=self.get_job_retry_deadline_seconds(), + on_error=reopen_conn_on_error + ) def fn(): return self._query_and_results( client, sql, job_params, + error_counter=error_counter, job_creation_timeout=job_creation_timeout, job_execution_timeout=job_execution_timeout, limit=limit, ) - query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn) + query_job, iterator = fn() return query_job, iterator @@ -649,11 +666,34 @@ def fn(): self._retry_and_handle(msg="create dataset", conn=conn, fn=fn) + def _gen_submitted_log_query(self, query_job, error_counter): + if ( + query_job.job_id is not None + and query_job.project is not None + and query_job.location is not None + ): + attemp = error_counter.error_count + maximum_attemp = error_counter.retries + bq_link = self._bq_job_link( + query_job.location, + query_job.project, + query_job.job_id, + ) + node_info = get_node_info() + materialized = node_info.get("materialized") + resource_type = node_info.get("resource_type") + node_name = node_info.get("node_name") + + logger.info( + f"[{attemp}/{maximum_attemp} attempts for {materialized} {resource_type} {node_name} {bq_link}]" + ) + def _query_and_results( self, client, sql, job_params, + error_counter, job_creation_timeout=None, job_execution_timeout=None, limit: Optional[int] = None, @@ -662,6 +702,7 @@ def _query_and_results( # Cannot reuse job_config if destination is set and ddl is used job_config = google.cloud.bigquery.QueryJobConfig(**job_params) query_job = client.query(query=sql, job_config=job_config, timeout=job_creation_timeout) + self._gen_submitted_log_query(query_job=query_job, error_counter=error_counter) iterator = query_job.result(max_results=limit, timeout=job_execution_timeout) return query_job, iterator From 18d3dd0b23f22d76d38b21038bbb2e2146165cd2 Mon Sep 17 00:00:00 2001 From: tbog Date: Sun, 11 Jun 2023 17:12:59 +0700 Subject: [PATCH 2/4] Fix: add conn params to get_job_retry_deadline_seconds --- dbt/adapters/bigquery/connections.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index 21d863738..dac316130 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -467,7 +467,7 @@ def reopen_conn_on_error(error): @retry.Retry( predicate=error_counter.count_error, sleep_generator=self._retry_generator(), - deadline=self.get_job_retry_deadline_seconds(), + deadline=self.get_job_retry_deadline_seconds(conn), on_error=reopen_conn_on_error ) def fn(): From a554ef7bc7a1de50770ee795593ca917581e6750 Mon Sep 17 00:00:00 2001 From: tbog Date: Sun, 11 Jun 2023 17:48:15 +0700 Subject: [PATCH 3/4] Ref: add comment --- dbt/adapters/bigquery/connections.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/dbt/adapters/bigquery/connections.py b/dbt/adapters/bigquery/connections.py index dac316130..8e038a242 100644 --- a/dbt/adapters/bigquery/connections.py +++ b/dbt/adapters/bigquery/connections.py @@ -672,6 +672,7 @@ def _gen_submitted_log_query(self, query_job, error_counter): and query_job.project is not None and query_job.location is not None ): + # get attemp num attemp = error_counter.error_count maximum_attemp = error_counter.retries bq_link = self._bq_job_link( @@ -679,6 +680,8 @@ def _gen_submitted_log_query(self, query_job, error_counter): query_job.project, query_job.job_id, ) + + # get node info node_info = get_node_info() materialized = node_info.get("materialized") resource_type = node_info.get("resource_type") From 55a4082d1cea0bb96ead433f96bf8df179bb1abf Mon Sep 17 00:00:00 2001 From: tbog Date: Sun, 11 Jun 2023 21:00:51 +0700 Subject: [PATCH 4/4] Ref: add changelog entry from changie --- .changes/unreleased/Features-20230611-210014.yaml | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 .changes/unreleased/Features-20230611-210014.yaml diff --git a/.changes/unreleased/Features-20230611-210014.yaml b/.changes/unreleased/Features-20230611-210014.yaml new file mode 100644 index 000000000..628014fc4 --- /dev/null +++ b/.changes/unreleased/Features-20230611-210014.yaml @@ -0,0 +1,6 @@ +kind: Features +body: Provide bigquery job link after it's submitted +time: 2023-06-11T21:00:14.376652604+07:00 +custom: + Author: tbog + Issue: "696"