Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP]: retry with only_system #2131

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 71 additions & 3 deletions metaflow/plugins/retry_decorator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,53 @@
import sys
import traceback
from functools import wraps
from metaflow.decorators import StepDecorator
from metaflow.exception import MetaflowException
from metaflow.exception import MetaflowException, METAFLOW_EXIT_DISALLOW_RETRY
from metaflow.metaflow_config import MAX_ATTEMPTS


SYSTEM_ERROR_MESSAGE_PATTERNS = [
"spot instance interruption",
"connection reset by peer",
"connection timed out",
"resource temporarily unavailable",
"internal error",
"service unavailable",
"oom-killer",
"out of memory",
"sigterm",
"sigkill",
"ehostunreach",
"no space left on device",
"too many open files",
]


def is_system_error(exception):
error_msg = str(exception).lower()

for pattern in SYSTEM_ERROR_MESSAGE_PATTERNS:
if pattern.lower() in error_msg:
return True

if hasattr(exception, "errno"):
system_errno = {
4, # EINTR
5, # EIO
11, # EAGAIN
12, # ENOMEM
13, # EACCES
24, # EMFILE
104, # ECONNRESET
110, # ETIMEDOUT
}

if exception.errno in system_errno:
return True

return False


class RetryDecorator(StepDecorator):
"""
Specifies the number of times the task corresponding
Expand All @@ -22,10 +67,12 @@ class RetryDecorator(StepDecorator):
Number of times to retry this task.
minutes_between_retries : int, default 2
Number of minutes between retries.
only_system : bool, default False
If True, only retry on system-level failures
"""

name = "retry"
defaults = {"times": "3", "minutes_between_retries": "2"}
defaults = {"times": "3", "minutes_between_retries": "2", "only_system": False}

def step_init(self, flow, graph, step, decos, environment, flow_datastore, logger):
# The total number of attempts must not exceed MAX_ATTEMPTS.
Expand All @@ -37,4 +84,25 @@ def step_init(self, flow, graph, step, decos, environment, flow_datastore, logge
)

def step_task_retry_count(self):
return int(self.attributes["times"]), 0
times = int(self.attributes["times"])
if self.attributes["only_system"]:
return 0, times
return times, 0

def task_decorate(
self, step_func, flow, graph, retry_count, max_user_code_retries, ubf_context
):
@wraps(step_func)
def fallback_step(*args, **kwargs):
try:
step_func(*args, **kwargs)
except Exception as ex:
if not is_system_error(ex):
traceback.print_exc()
sys.exit(METAFLOW_EXIT_DISALLOW_RETRY)
raise

if self.attributes["only_system"]:
return fallback_step

return step_func
Loading