ray: [Core] [RFC] Add ability to filter application-level exceptions that should be retried.
In Workflows, Datasets, and other libraries/use cases, it is desirable to support retrying transient application-level errors. This is currently supported via the retry_exception=True argument:
@ray.remote(retry_exceptions=True, max_retries=3)
def f():
raise ValueError() # retried up to 3 times
However, not all application-level errors are transient, and this blanket “always retry” policy will result in needless retries on unrecoverable failures. The common solution in other retry libraries is to have either:
- An allowlist of exceptions that can be retried and/or a denylist of exceptions that should not be retried.
- A predicate (callable) that is given an exception instance and returns
Trueif the exception is retryable.
Some examples are urllib3.Retry’s allow_methods and status_forcelist, gRPC’s retry policy with retryable status code lists, and Google Cloud Python Retry’s predicate.
Proposal
We should offer such an API so users can filter to the application-level exceptions that are transient, preventing needless retries. The predicate callable is the most powerful, and allows the user to multiplex based on error metadata included on the exception, such as status codes. For the sake of making the common path UX ideal, we should also accept an allowlist of exceptions.
NOTE: We ultimately decided to go with an exception allowlist to start, and will revisit an exception predicate once users wish to (1) specify a denylist, (2) use error metadata (such as status codes) to determine whether to retry an exception/error, or (3) use such error filtering in other language workers (Java, C++); in any such cases, standardizing on an exception predicate would make the most sense.
Exception Allowlist
@ray.remote(
retry_exceptions=ConnectionError,
max_retries=3,
)
def f(a):
if a < 0:
raise ValueError("invalid input")
else:
# connect_and_add might transiently raise a ConnectionError
return connect_and_add(a)
# Fails without retrying the non-transient error.
ray.get(f.remote(-1))
# Retries up to 3 times.
ray.get(f.remote(5))
Exception Predicate
@ray.remote(
retry_exceptions=lambda e: isinstance(e, ConnectionError),
max_retries=3,
)
def f(a):
if a < 0:
raise ValueError("invalid input")
else:
# connect_and_add might transiently raise a ConnectionError
return connect_and_add(a)
# Fails without retrying the non-transient error.
ray.get(f.remote(-1))
# Retries up to 3 times.
ray.get(f.remote(5))
We have a few options for the API itself:
- Overload
retry_exceptions, whose type would beUnion[bool, Callable[[Exception], bool]]with the following semantics: ifFalse, don’t retry application-level errors; ifTrue, retry all application-level errors; if a callable, only retry application-level errors for which the provided callable predicate returnsTrue. - Have a separate
retry_exceptions_predicatearg that’sNoneby default and only settable ifretry_exceptions=True. - Add a
RetryPolicydata class that houses all args relating to retrying, similar to theSchedulingStrategydata class.
class RetryPolicy:
num_retries: int
retry_exceptions: bool
exception_predicate: Callable[[Exception], bool]
For API conciseness and implementation simplicity, I would lean towards (1), but happy to hear other opinions.
Implementation Note
For both predicate callables and allowlist/denylist lists of exception types, we’ll need to serialize and deserialize this task option with Pickle.
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Reactions: 1
- Comments: 15 (13 by maintainers)
This SGTM!
Doesn’t the ray.remote decorator compose with existing retrying libs & their decorators, e.g., https://tenacity.readthedocs.io/en/latest/index.html ?
@rkooo567 Agreed, something like:
which would then be easy to extend in the future: