distributed: Adaptative stops after some time

Hi, First ❤️ dask, so thank you !

What happened: I’m using dask-kubernetes (0.11.0) to run a distributed dask cluster on top of azure kubernetes service. The cluster itself is comprised of two nodepools with auto scaling enabled:

  • stable: VMS of standard priority (eg: guaranteed availabilty)
  • spot: VMS of spot priority (eg: premptible can be deleted anytime). To save on compute costs

I’m using the adaptative feature of dask-distributed to make sure the pool of spot instances gets “renewed” as spot nodes get deleted. At first everything works as expected, spot nodes get created to replace the deleted ones.

After some time though (20-30 minutes) the adaptative feature stops working with a:

distributed.deploy.adaptive_core - INFO - Adaptive stop

Looking at the code it seems this is done when there is an OSError. I couldn’t find more details about it though. I would love to have more details on this.

What you expected to happen: I would expect the adaptative to continue working.

Minimal Complete Verifiable Example: Not runable as is sorry.

from dask_kubernetes import KubeCluster
from distributed import Client

cluster = KubeCluster()
client = Client(cluster)
cluster.scale(80)
cluster.adapt(minimum=75, maximum=80, wait_count=10, interval='10s')
... actual dask job ...

Anything else we need to know?: Somehow using adapt to handle pre-emptible nodes getting deleted feels a bit like a workaround. I’m not aware of a dedicated dask-kubernetes or distributed but maybe I just missed it.

Environment:

  • Dask version: 2.30.0
  • Python version: 3.6.12
  • Operating System: Debian testing
  • Install method (conda, pip, source): pip (via poetry)
  • Distributed version: 2.30.0

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Reactions: 1
  • Comments: 18 (18 by maintainers)

Most upvoted comments

Actually there is an issue in the above code, it can call close_gracefully several times for a given worker. Here is the latest version:

class PreemptibleListener(WorkerPlugin):
    def __init__(
        self, interval=1, metadata_url=None,
        termination_events=None
    ):
        self.callback = None
        self.loop = None
        self.worker = None
        self.interval_s = interval
        self.metadata_url = (
            metadata_url or
            'http://169.254.169.254/metadata/scheduledevents?api-version=2019-08-01'
        )
        self.termination_events = termination_events or ['Preempt', 'Terminate']
        self.terminating = False

    async def poll_status(self):
        if self.terminating:
            return
        async with aiohttp.ClientSession(headers={'Metadata': 'true'}) as session:
            async with session.get(self.metadata_url) as response:
                data = await response.json()
                for evt in data['Events']:
                    event_type = evt['EventType']
                    if event_type in self.termination_events:
                        self.terminating = True
                        logger.info(
                            'Node will be deleted, attempting graceful shutdown',
                            data_event=evt, worker_name=self.worker.name
                        )
                        await self.worker.close_gracefully()

    def setup(self, worker):
        self.worker = worker
        self.loop = IOLoop.current()
        # callback time is in ms
        self.callback = PeriodicCallback(
            self.poll_status,
            callback_time=self.interval_s * 1_000
        )
        self.loop.add_callback(self.callback.start)

    def teardown(self, worker):
        logger.info('Tearing down plugin', worker_name=self.worker.name)
        if self.callback:
            self.callback.stop()
            self.callback = None

I would love to contribute it and I’m ready to spend some time on it. I just feel like things need to be a bit more polished before doing that, so I’m going to do some testing on production workloads to make sure it improves the situation.