distributed: Worker stuck in closing_gracefully state
(Note: @mrocklin heavily edited this issue, the words are his (sorry Brett to impersonate you) but the logs are coming from Brett)
We’ve found that workers can be stuck in a closing_gracefully state indefinitely. Here is some context:
Cluster
We’re running an adaptively scaled cluster of roughly 4000 workers.
cluster = KubeHelmCluster(release_name=release_name) # custom wrapper around dask_kubernetes.KubeCluster
cluster.adapt(minimum=0, maximum=4000, interval="10s", target_duration="60s")
Computation
Our workload is pretty simple. We’re just reading a bunch of data, and then calling a map_partitions call and that’s it. We see that the cluster scales up pretty fast, and then scales down pretty fast.
dd.read_parquet("gs://.../*.parquet").map_partitions(len).compute() # ~11k parquet files
Some findings
Both the scheduler and the worker agree that it is in a closing gracefully state (this is after a long while)
In [30]: c.run_on_scheduler(lambda dask_scheduler: dask_scheduler.workers[w].status)
Out[30]: <Status.closing_gracefully: 'closing_gracefully'>
In [31]: c.run(lambda dask_worker: str(dask_worker), workers=["tcp://10.36.228.19:34273"])
Out[31]: {'tcp://10.36.228.19:34273': "<Worker 'tcp://10.36.228.19:34273', name: 128, status: closing_gracefully, stored: 1, running: 0/4, ready: 3, comm: 0, waiting: 0>"}
Interestingly, they both also agree that there are three tasks that are ready to go. An artificial call to Worker._ensure_computing at this point doesn’t do anything because the first check in that method is if self.status != Status.Running: return.
Here are the logs on the scheduler that refer to this worker.
(.venv) ➜ model git:(ta_backfill) ✗ kl brett-f4d63eb0-daskscheduler-667b4fc95-f897l | grep '10.36.228.19:34273'
2022-04-27 13:22:17,287 - distributed.scheduler - INFO - Register worker <WorkerState 'tcp://10.36.228.19:34273', name: 128, status: undefined, memory: 0, processing: 0>
2022-04-27 13:22:17,291 - distributed.scheduler - INFO - Starting worker compute stream, tcp://10.36.228.19:34273
2022-04-27 13:23:01,001 - distributed.scheduler - INFO - Retiring worker tcp://10.36.228.19:34273
2022-04-27 13:23:01,448 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; 3 keys are being moved away.
2022-04-27 13:23:02,341 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; 3 keys are being moved away.
2022-04-27 13:23:03,716 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; 2 keys are being moved away.
2022-04-27 13:23:05,544 - distributed.active_memory_manager - INFO - Retiring worker tcp://10.36.228.19:34273; no unique keys need to be moved away.
And the events from the scheduler’s perspective
In [32]: c.run_on_scheduler(lambda dask_scheduler: dask_scheduler.events[w])
Out[32]:
deque([(1651065737.2876427, {'action': 'add-worker'}),
(1651065737.629527,
{'action': 'worker-status-change',
'prev-status': 'undefined',
'status': 'running'}),
(1651065793.225713,
{'action': 'missing-data',
'key': "('_split-659a14d513522415a58d9ec3d470e072', 8087)"}),
(1651065793.2970057,
{'action': 'missing-data',
'key': "('_split-659a14d513522415a58d9ec3d470e072', 8087)"})])
And the logs on the worker side
(.venv) ➜ model git:(ta_backfill) ✗ kl -f brett-f4d63eb0-daskworkers-584d467f76-9zctq
/usr/src/python/distributed/distributed/cli/dask_worker.py:319: FutureWarning: The --nprocs flag will be removed in a future release. It has been renamed to --nworkers.
warnings.warn(
2022-04-27 13:22:12,055 - distributed.nanny - INFO - Start Nanny at: 'tcp://10.36.228.19:46595'
2022-04-27 13:22:13,553 - distributed.worker - INFO - Start worker at: tcp://10.36.228.19:34273
2022-04-27 13:22:13,553 - distributed.worker - INFO - Listening to: tcp://10.36.228.19:34273
2022-04-27 13:22:13,553 - distributed.worker - INFO - dashboard at: 10.36.228.19:33865
2022-04-27 13:22:13,553 - distributed.worker - INFO - Waiting to connect to: tcp://brett-f4d63eb0-daskscheduler:8786
2022-04-27 13:22:13,553 - distributed.worker - INFO - -------------------------------------------------
2022-04-27 13:22:13,553 - distributed.worker - INFO - Threads: 4
2022-04-27 13:22:13,553 - distributed.worker - INFO - Memory: 7.82 GiB
2022-04-27 13:22:13,553 - distributed.worker - INFO - Local Directory: /src/dask-worker-space/worker-ykvkqgn_
2022-04-27 13:22:13,554 - distributed.worker - INFO - -------------------------------------------------
2022-04-27 13:22:17,292 - distributed.worker - INFO - Registered to: tcp://brett-f4d63eb0-daskscheduler:8786
2022-04-27 13:22:17,292 - distributed.worker - INFO - -------------------------------------------------
2022-04-27 13:22:17,293 - distributed.core - INFO - Starting established connection
2022-04-27 13:22:56,885 - distributed.utils_perf - INFO - full garbage collection released 105.93 MiB from 0 reference cycles (threshold: 9.54 MiB)
Thoughts
My guess is that when entering a closing_gracefully state on the worker side we don’t immediately clear out ready work, when maybe we should.
Also, we should have some sanity checks so that close_gracefully can’t go on for forever.
About this issue
- Original URL
- State: closed
- Created 2 years ago
- Comments: 25 (14 by maintainers)
I’ll open a PR.
Yes I think it’s a bad idea - it adds to the complexity of the whole system, and it does not work when the retirement is initiated from the client or the scheduler, at all. A timeout, if any, should be in
Scheduler._track_retire_worker.