distributed: Transition queued->memory causes AssertionError

While trying to reproduce #7063, I came across a different error, this one with queueing enabled. The below reproducer is NOT minimal - there is likely quite a bit of simplification possible.

@gen_cluster(client=True, nthreads=[("", 1)], config={"distributed.scheduler.worker-saturation": 1.5})
async def test_steal_rootish_while_retiring(c, s, a):
    """https://github.com/dask/distributed/issues/7063

    Note that this applies to both tasks that raise Reschedule as well as work stealing.
    """
    ev = Event()

    # Put a task in memory on a, which will be retired, and prevent b from acquiring
    # a replica. This will cause a to be stuck in closing_gracefully state until we
    # set b.block_gather_dep.
    m = c.submit(inc, 1, key="m", workers=[a.address])
    await wait(m)

    async with BlockedGatherDep(s.address, nthreads=1) as b:
        # Large number of tasks to make sure they're rootish
        futures = c.map(
            lambda i, ev: ev.wait(), range(10), ev=ev, key=[f"x-{i}" for i in range(10)]
        )

        while a.state.executing_count != 1 or b.state.executing_count != 1:
            await asyncio.sleep(0.01)

        assert s.is_rootish(s.tasks[futures[0].key])

        retire_task = asyncio.create_task(c.retire_workers([a.address]))
        # Wait until AMM sends AcquireReplicasEvent to b to move away m
        await b.in_gather_dep.wait()
        assert s.workers[a.address].status == Status.closing_gracefully

        # Steal any of the tasks on a
        steal_key = next(iter(a.state.executing)).key
        s.reschedule(steal_key, stimulus_id="steal")
        await ev.set()

        # The stolen task can now complete on the other worker
        await wait_for_state(steal_key, "memory", b)
        await wait_for_state(steal_key, "memory", s)

        # Let graceful retirement of a complete.
        # This in turn reschedules whatever tasks were still processing on a to b.
        b.block_gather_dep.set()
        await retire_task
        await wait(futures)

The test is green; however I read in the log:

  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 5284, in handle_task_finished
    r: tuple = self.stimulus_task_finished(
  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 4649, in stimulus_task_finished
    r: tuple = self._transition(
  File "/home/crusaderky/github/distributed/distributed/scheduler.py", line 1813, in _transition
    assert not args and not kwargs, (args, kwargs, start, finish)
AssertionError: ((), {'worker': 'tcp://127.0.0.1:45929', 'nbytes': 28, 'type': b'\x80\x04\x95\x15\x00\x00\x00\x00\x00\x00\x00\x8c\x08builtins\x94\x8c\x04bool\x94\x93\x94.', 'typename': 'bool', 'metadata': {}, 'thread': 139862053221952, 'startstops': ({'action': 'compute', 'start': 1666802403.9580944, 'stop': 1666802403.9590282},), 'status': 'OK'}, 'queued', 'memory')

What is happening:

  1. steal_key is processing on a
  2. steal_key is rescheduled, which causes the scheduler to send a free-keys message to a and put the task back in queue
  3. before the free-keys message can reach a, steal_key finishes on a
  4. steal_key transitions to memory on a, sending a TaskFinishedMsg to the scheduler.
  5. a queued->memory transition happens which, I suspect, is otherwise untested.

This is timing-sensitive; if free-keys reached a before the task end, then steal_key would be cancelled and transition to forgotten without any messaging when it ends.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 1
  • Comments: 15 (5 by maintainers)

Most upvoted comments

Below the scheduler transition log. FOO and BAR are two keys, s.t. FOO depends on BAR

BAR is calculated on WorkerA (I changed the stimulus IDs slightly to include the wokrer address)

the transition waiting -> memory is triggered by a task-finished from WorkerB

I think what happens is…

  • WorkerA finishes BAR
  • FOO is transitioned to processing and assigned to WorkerB
  • WorkerB fetches BAR and sends an add_keys message to the scheduler
  • In the meantime, WorkerA dies and causes BAR to be scheduled back to released/waiting. Scheduler queues up a free-keys intended for WorkerB to cancel FOO
  • The add_keys from WorkerB only arrives after the A has been removed and all tasks are transitioned. This should trigger a remove-replicas (https://github.com/dask/distributed/blob/9c8ff86ecb5f26c2e8da4510c8a320bb396ead1c/distributed/scheduler.py#L6790-L6803) but I haven’t confirmed
  • WorkerB computes FOO and submits it’s results to the scheduler
  • WorkerB receives the free-keys of the initial cancellation
[
    (
        "FOO",
        "released",
        "waiting",
        {},
        "update-graph-1666955210.246825",
        1666955210.291511,
    ),
    (
        "BAR",
        "processing",
        "memory",
        {...},
        "task-finished-1666955230.056821",
        1666955230.1450088,
    ),
    (
        "FOO",
        "waiting",
        "processing",
        {},
        "task-finished-1666955230.056821",
        1666955230.145199,
    ),
    (
        "BAR",
        "memory",
        "released",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955230.327744,
    ),
    (
        "FOO",
        "processing",
        "released",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955230.327806,
    ),
    (
        "FOO",
        "released",
        "waiting",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955230.3278348,
    ),
    (
        "FOO",
        "waiting",
        "memory",
        {...},
        "task-finished-1666955230.232949",
        1666955261.059616,
    ),
    (
        "FOO",
        "memory",
        "released",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955262.808954,
    ),
    (
        "FOO",
        "released",
        "waiting",
        {...},
        "handle-worker-cleanup-WorkerA-1666955230.323402",
        1666955262.809029,
    ),
]