dask-cloudprovider: "Rate exceeded" error on FargateCluster

from dask_cloudprovider import FargateCluster
cluster = FargateCluster(n_workers=1, image='rsignell/pangeo-worker:2020-01-23c')
client = Client(cluster)

worked on my SageMaker instance on uswest-2, but failed my useast-1 instance with:

Task exception was never retrieved
future: <Task finished coro=<_wrap_awaitable() done, defined at /home/ec2-user/SageMaker/myenvs/pangeo/lib/python3.6/asyncio/tasks.py:530> exception=ClientError('An error occurred (ThrottlingException) when calling the DescribeTasks operation (reached max retries: 4): Rate exceeded',)>
Traceback (most recent call last):
  File "/home/ec2-user/SageMaker/myenvs/pangeo/lib/python3.6/asyncio/tasks.py", line 537, in _wrap_awaitable
    return (yield from awaitable.__await__())
  File "/home/ec2-user/SageMaker/myenvs/pangeo/lib/python3.6/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 130, in _
    await self.start()
  File "/home/ec2-user/SageMaker/myenvs/pangeo/lib/python3.6/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 240, in start
    await self._update_task()
  File "/home/ec2-user/SageMaker/myenvs/pangeo/lib/python3.6/site-packages/dask_cloudprovider/providers/aws/ecs.py", line 158, in _update_task
    cluster=self.cluster_arn, tasks=[self.task_arn]
  File "/home/ec2-user/SageMaker/myenvs/pangeo/lib/python3.6/site-packages/aiobotocore/client.py", line 102, in _make_api_call
    raise error_class(parsed_response, operation_name)
botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DescribeTasks operation (reached max retries: 4): Rate exceeded

It looks like the PR https://github.com/dask/dask-cloudprovider/pull/44 was designed to address at these problems, but it seems I’m still having them despite running v0.1.1 which includes that PR. I’m wondering whether others are still experiencing this?

About this issue

  • Original URL
  • State: closed
  • Created 4 years ago
  • Reactions: 5
  • Comments: 24 (10 by maintainers)

Commits related to this issue

Most upvoted comments

I’m having a similar issue, except for me it’s botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DeregisterTaskDefinition operation (reached max retries: 4): Rate exceeded

Hi - I’m getting the same error as a previous user in this thread.

Error retrieving security group information for [sg-0176ba5002edd59c3]: Request limit exceeded. (ErrorCode: RequestLimitExceeded)

To my knowledge, there isn’t anything I can do to increase this from Service Quotas. This only happens when I send off a large Batch array job on Fargate.

Thanks @rpanai. Does that happen consistently?

Hi I’m still having the same problem despite using the latest version of Dask and dask-cloudprovider. If I’m processing many input with bag or delayed I still continue to receive the following error

botocore.exceptions.ClientError: An error occurred (ThrottlingException) when calling the DescribeTasks operation (reached max retries: 4): Rate exceeded

long after all the computation is done. As consequence my script never ends and, as I’m using Airflow in order to manage my pipeline, I got stucked.

Yes that error is expected. In order to avoid that you will need to request AWS to increase your service limits.

This version is meant to honor max_tries:

diff --git a/dask_cloudprovider/providers/aws/ecs.py b/dask_cloudprovider/providers/aws/ecs.py
index d178b3a..503fbcf 100644
--- a/dask_cloudprovider/providers/aws/ecs.py
+++ b/dask_cloudprovider/providers/aws/ecs.py
@@ -28,6 +28,32 @@ DEFAULT_TAGS = {
 }  # Package tags to apply to all resources
 
 
+MAX_THROTTLING_TRIES = 10  # arbitrary...
+
+
+async def retry_when_throttled(
+    func, *args, max_tries=MAX_THROTTLING_TRIES, **kwargs,
+):
+    current_try = 0
+
+    while True:
+        try:
+            return await func(*args, **kwargs)
+        except ClientError as e:
+            if e.response["Error"]["Code"] == "ThrottlingException":
+                backoff_duration = get_sleep_duration(current_try)
+                current_try += 1
+                if current_try == max_tries:
+                    raise
+                warnings.warn(
+                    "get_log_events rate limit exceeded, retrying after delay.",
+                    RuntimeWarning,
+                )
+                await asyncio.sleep(backoff_duration)
+            else:
+                raise
+
+
 class Task:
     """ A superclass for managing ECS Tasks
     Parameters
@@ -296,7 +322,6 @@ class Task:
         )
 
     async def logs(self, follow=False):
-        current_try = 0
         next_token = None
         read_from = 0
 
@@ -304,13 +329,15 @@ class Task:
             try:
                 async with self._client("logs") as logs:
                     if next_token:
-                        l = await logs.get_log_events(
+                        l = await retry_when_throttled(
+                            logs.get_log_events,
                             logGroupName=self.log_group,
                             logStreamName=self._log_stream_name,
                             nextToken=next_token,
                         )
                     else:
-                        l = await logs.get_log_events(
+                        l = await retry_when_throttled(
+                            logs.get_log_events,
                             logGroupName=self.log_group,
                             logStreamName=self._log_stream_name,
                             startTime=read_from,
@@ -327,18 +354,6 @@ class Task:
                 for event in l["events"]:
                     read_from = event["timestamp"]
                     yield event["message"]
-            except ClientError as e:
-                if e.response["Error"]["Code"] == "ThrottlingException":
-                    warnings.warn(
-                        "get_log_events rate limit exceeded, retrying after delay.",
-                        RuntimeWarning,
-                    )
-                    backoff_duration = get_sleep_duration(current_try)
-                    await asyncio.sleep(backoff_duration)
-                    current_try += 1
-                else:
-                    raise
-
     def __repr__(self):
         return "<ECS Task %s: status=%s>" % (type(self).__name__, self.status)