ray: [Core] [RFC] Add ability to filter application-level exceptions that should be retried.

In Workflows, Datasets, and other libraries/use cases, it is desirable to support retrying transient application-level errors. This is currently supported via the retry_exception=True argument:

@ray.remote(retry_exceptions=True, max_retries=3)
def f():
   raise ValueError()   # retried up to 3 times

However, not all application-level errors are transient, and this blanket “always retry” policy will result in needless retries on unrecoverable failures. The common solution in other retry libraries is to have either:

  1. An allowlist of exceptions that can be retried and/or a denylist of exceptions that should not be retried.
  2. A predicate (callable) that is given an exception instance and returns True if the exception is retryable.

Some examples are urllib3.Retry’s allow_methods and status_forcelist, gRPC’s retry policy with retryable status code lists, and Google Cloud Python Retry’s predicate.

Proposal

We should offer such an API so users can filter to the application-level exceptions that are transient, preventing needless retries. The predicate callable is the most powerful, and allows the user to multiplex based on error metadata included on the exception, such as status codes. For the sake of making the common path UX ideal, we should also accept an allowlist of exceptions.

NOTE: We ultimately decided to go with an exception allowlist to start, and will revisit an exception predicate once users wish to (1) specify a denylist, (2) use error metadata (such as status codes) to determine whether to retry an exception/error, or (3) use such error filtering in other language workers (Java, C++); in any such cases, standardizing on an exception predicate would make the most sense.

Exception Allowlist

@ray.remote(
    retry_exceptions=ConnectionError,
    max_retries=3,
)
def f(a):
    if a < 0:
        raise ValueError("invalid input")
    else:
        # connect_and_add might transiently raise a ConnectionError
        return connect_and_add(a)

# Fails without retrying the non-transient error.
ray.get(f.remote(-1))

# Retries up to 3 times.
ray.get(f.remote(5))

Exception Predicate

@ray.remote(
    retry_exceptions=lambda e: isinstance(e, ConnectionError),
    max_retries=3,
)
def f(a):
    if a < 0:
        raise ValueError("invalid input")
    else:
        # connect_and_add might transiently raise a ConnectionError
        return connect_and_add(a)

# Fails without retrying the non-transient error.
ray.get(f.remote(-1))

# Retries up to 3 times.
ray.get(f.remote(5))

We have a few options for the API itself:

  1. Overload retry_exceptions, whose type would be Union[bool, Callable[[Exception], bool]] with the following semantics: if False, don’t retry application-level errors; if True, retry all application-level errors; if a callable, only retry application-level errors for which the provided callable predicate returns True.
  2. Have a separate retry_exceptions_predicate arg that’s None by default and only settable if retry_exceptions=True.
  3. Add a RetryPolicy data class that houses all args relating to retrying, similar to the SchedulingStrategy data class.
class RetryPolicy:
    num_retries: int
    retry_exceptions: bool
    exception_predicate: Callable[[Exception], bool]

For API conciseness and implementation simplicity, I would lean towards (1), but happy to hear other opinions.

Implementation Note

For both predicate callables and allowlist/denylist lists of exception types, we’ll need to serialize and deserialize this task option with Pickle.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 15 (13 by maintainers)

Most upvoted comments

@rkooo567 @ericl @scv119 How about we create another, larger-scope RFC covering the introduction of a RetryPolicy API that will include discussion of:

Backoff configuration (initial backoff, max backoff, backoff factor, jitter). System-level vs. application-level retry policies. Consolidation of normal task retries and actor (task) retries into this RetryPolicy.

This SGTM!

Doesn’t the ray.remote decorator compose with existing retrying libs & their decorators, e.g., https://tenacity.readthedocs.io/en/latest/index.html ?

@rkooo567 Agreed, something like:

class RetryPolicy:
    num_retries: int
    retry_exceptions: bool
    exception_predicate: Callable[[Exception], bool]

which would then be easy to extend in the future:

class RetryPolicy:
    num_retries: int
    retry_exceptions: bool
    exception_predicate: Callable[[Exception], bool]
    initial_backoff: int
    max_backoff: int
    backoff_factor: int
    jitter: Union[int, Tuple[int, int]]