Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry query jobs and use info-level logging for job link, etc #768

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/unreleased/Features-20230611-210014.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
kind: Features
body: Provide bigquery job link after it's submitted
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm going to proactively update the changelog entry for this PR to be more precise. I will commit (rather than just suggest) with the intent that we can improve it further as-needed.

Suggested change
body: Provide bigquery job link after it's submitted
body: Retry query jobs and use info-level logging for job link, etc

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update: I can't push commits to this branch, so I'll just make this a blocking review instead.

time: 2023-06-11T21:00:14.376652604+07:00
custom:
Author: tbog
Issue: "696"
46 changes: 45 additions & 1 deletion dbt/adapters/bigquery/connections.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
from dbt.events import AdapterLogger
from dbt.events.functions import fire_event
from dbt.events.types import SQLQuery
from dbt.events.contextvars import get_node_info
from dbt.version import __version__ as dbt_version

from dbt.dataclass_schema import ExtensibleDbtClassMixin, StrEnum
Expand Down Expand Up @@ -454,17 +455,33 @@ def raw_execute(self, sql, use_legacy_sql=False, limit: Optional[int] = None):
job_creation_timeout = self.get_job_creation_timeout_seconds(conn)
job_execution_timeout = self.get_job_execution_timeout_seconds(conn)

def reopen_conn_on_error(error):
if isinstance(error, REOPENABLE_ERRORS):
logger.warning("Reopening connection after {!r}".format(error))
self.close(conn)
self.open(conn)
return

error_counter = _ErrorCounter(self.get_job_retries(conn))

@retry.Retry(
predicate=error_counter.count_error,
sleep_generator=self._retry_generator(),
deadline=self.get_job_retry_deadline_seconds(conn),
on_error=reopen_conn_on_error
)
def fn():
return self._query_and_results(
client,
sql,
job_params,
error_counter=error_counter,
job_creation_timeout=job_creation_timeout,
job_execution_timeout=job_execution_timeout,
limit=limit,
)

query_job, iterator = self._retry_and_handle(msg=sql, conn=conn, fn=fn)
query_job, iterator = fn()

return query_job, iterator

Expand Down Expand Up @@ -649,11 +666,37 @@ def fn():

self._retry_and_handle(msg="create dataset", conn=conn, fn=fn)

def _gen_submitted_log_query(self, query_job, error_counter):
if (
query_job.job_id is not None
and query_job.project is not None
and query_job.location is not None
):
# get attemp num
attemp = error_counter.error_count
maximum_attemp = error_counter.retries
bq_link = self._bq_job_link(
query_job.location,
query_job.project,
query_job.job_id,
)

# get node info
node_info = get_node_info()
materialized = node_info.get("materialized")
resource_type = node_info.get("resource_type")
node_name = node_info.get("node_name")

logger.info(
f"[{attemp}/{maximum_attemp} attempts for {materialized} {resource_type} {node_name} {bq_link}]"
)

def _query_and_results(
self,
client,
sql,
job_params,
error_counter,
job_creation_timeout=None,
job_execution_timeout=None,
limit: Optional[int] = None,
Expand All @@ -662,6 +705,7 @@ def _query_and_results(
# Cannot reuse job_config if destination is set and ddl is used
job_config = google.cloud.bigquery.QueryJobConfig(**job_params)
query_job = client.query(query=sql, job_config=job_config, timeout=job_creation_timeout)
self._gen_submitted_log_query(query_job=query_job, error_counter=error_counter)
iterator = query_job.result(max_results=limit, timeout=job_execution_timeout)

return query_job, iterator
Expand Down