prefect: `ValueError: Path /root/.prefect/storage/...` does not exist

Prefect 2.0b15 has an issue where a task will occasionally fail if it cant find the serialized file. It almost looks like a race condition? Is the task starting before the file is written? The error will appear occasionally have ~100 or more tasks have executed, not consistently.

18:55:13.371 | DEBUG   | prefect.engine - Reported crashed task run 'mouse_detection-49dd9cf8-37-b6710e8b93d84a3ea96ea1d76f546466-1' successfully.
18:55:13.371 | INFO    | Task run 'mouse_detection-49dd9cf8-75' - Crash detected! Execution was interrupted by an unexpected exception.
18:55:13.372 | DEBUG   | Task run 'mouse_detection-49dd9cf8-75' - Crash details:
Traceback (most recent call last):
  File "/root/venv/lib/python3.8/site-packages/prefect_dask/task_runners.py", line 236, in wait
    return await future.result(timeout=timeout)
  File "/root/venv/lib/python3.8/site-packages/distributed/client.py", line 294, in _result
    raise exc.with_traceback(tb)
  File "/root/venv/lib/python3.8/site-packages/prefect/engine.py", line 957, in begin_task_run
    task_run.state.data._cache_data(await _retrieve_result(task_run.state))
  File "/root/venv/lib/python3.8/site-packages/prefect/results.py", line 38, in _retrieve_result
    serialized_result = await _retrieve_serialized_result(state.data)
  File "/root/venv/lib/python3.8/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/root/venv/lib/python3.8/site-packages/prefect/results.py", line 34, in _retrieve_serialized_result
    return await filesystem.read_path(result.key)
  File "/root/venv/lib/python3.8/site-packages/prefect/filesystems.py", line 79, in read_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path /root/.prefect/storage/b371ce283437427e8f66dc8bb6de5d7c does not exist.

About this issue

  • Original URL
  • State: closed
  • Created 2 years ago
  • Reactions: 3
  • Comments: 29 (11 by maintainers)

Commits related to this issue

Most upvoted comments

UPDATE: Turns out the error was entirely due to the way we configured our k8s container. Really sorry for the trouble! We customized the k8s container spec with a readinessProbe and livenessProbe. These are the default settings we use for our FastAPI k8s deployment. However, it seems that they do not play well with Prefect and the k8s cluster prematurely destroys “successful” pods. The symptom of k8s destroying the pod is the [alueError: Path /root/.prefect/storage/... does not exist error (and a “crashed” state for the very first task).

{
    "name": "prefect-job",
    "ports": [{"containerPort": 8888, "protocol": "TCP"}],
    "readinessProbe": {
        "tcpSocket": {"port": 8888},
        "initialDelaySeconds": 10,
        "periodSeconds": 10,
        "timeoutSeconds": 5,
        "successThreshold": 1,
        "failureThreshold": 5
    },
    "livenessProbe": {
        "tcpSocket": {"port": 8888},
        "initialDelaySeconds": 15,
        "periodSeconds": 20,
        "timeoutSeconds": 5,
        "successThreshold": 1,
        "failureThreshold": 5
    }
}

We removed the two probes and our flow runs were working again. From this experience here (and this thread from slack), perhaps this specific ValueError: Path... does not exist is the first exception that is raised whenever the flow run’s server crashes?

Also getting this same error (Prefect 2.2.0, KubernetesJob, tested on both default and Dask task runners), not sure if this helps but noticed that I only get this error for flows when a task takes longer than ~5:30 min to run… Are there any timeout / magic config numbers related to anyio or prefect.engine at the task level that might be set to 330 seconds?

NOTE: I do not get this error when running the flow locally.

Screenshot 2022-09-06 162056

Traceback:

Encountered exception during execution:
Traceback (most recent call last):
  File "/opt/conda/lib/python3.9/site-packages/prefect/engine.py", line 587, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/opt/conda/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 116, in run_sync_in_interruptible_worker_thread
    tg.start_soon(
  File "/opt/conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 662, in __aexit__
    raise exceptions[0]
  File "/opt/conda/lib/python3.9/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/opt/conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/opt/conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/conda/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 96, in capture_worker_thread_and_result
    result = __fn(*args, **kwargs)
  File "/opt/prefect/flows/src/tscatalog/flows/etl.py", line 329, in process_dataset
    loader = initialize_loader(loader_cls, params or {})
  File "/opt/conda/lib/python3.9/site-packages/prefect/tasks.py", line 294, in __call__
    return enter_task_run_engine(
  File "/opt/conda/lib/python3.9/site-packages/prefect/engine.py", line 727, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/opt/conda/lib/python3.9/site-packages/prefect/utilities/asyncutils.py", line 136, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/opt/conda/lib/python3.9/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/opt/conda/lib/python3.9/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/opt/conda/lib/python3.9/concurrent/futures/_base.py", line 446, in result
    return self.__get_result()
  File "/opt/conda/lib/python3.9/concurrent/futures/_base.py", line 391, in __get_result
    raise self._exception
  File "/opt/conda/lib/python3.9/site-packages/prefect/engine.py", line 859, in create_task_run_then_submit
    return await future._result()
  File "/opt/conda/lib/python3.9/site-packages/prefect/futures.py", line 227, in _result
    return final_state.result(raise_on_failure=raise_on_failure)
  File "/opt/conda/lib/python3.9/site-packages/prefect/orion/schemas/states.py", line 145, in result
    raise data
  File "/opt/conda/lib/python3.9/site-packages/prefect/task_runners.py", line 214, in submit
    result = await run_fn(**run_kwargs)
  File "/opt/conda/lib/python3.9/site-packages/prefect/engine.py", line 1017, in begin_task_run
    task_run.state.data._cache_data(await _retrieve_result(task_run.state))
  File "/opt/conda/lib/python3.9/site-packages/prefect/results.py", line 38, in _retrieve_result
    serialized_result = await _retrieve_serialized_result(state.data)
  File "/opt/conda/lib/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/conda/lib/python3.9/site-packages/prefect/results.py", line 34, in _retrieve_serialized_result
    return await filesystem.read_path(result.key)
  File "/opt/conda/lib/python3.9/site-packages/prefect/filesystems.py", line 149, in read_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path /home/micromamba/.prefect/storage/021dd2fbc6d54a39bd1e150cbbf5f0b1 does not exist.

I have been working on a Prefect 2.0 POC deployed to OpenShift with:

  • Orion API server
  • Orion agents
  • Minio for Remote storage
  • PostgresDB

Using this test flow deployed to an S3 bucket on the standalone Minio instance I was able to reproduce it quite consistently:

import time
from prefect import flow, task, get_run_logger
import random

@task
def load_task(name):
    logger = get_run_logger()
    logger.info(f"Executing task on {name}!")
    time.sleep(random.randrange(3,10))
    logger.info(f"Task on {name} complete!")
    return name

@flow()
def main_flow(task_count):
    logger = get_run_logger()
    logger.info(f"Executing flow...")
    tasks = [f"task_{i}" for i in range(1, task_count + 1)]
    for future in load_task.map(tasks):
        future.wait()
    return

if __name__ == "__main__": 
    main_flow(50)

For example, using Postman to trigger the flow 10 times in quick succession to {{prefect_host}}/api/deployments/:id/create_flow_run with this payload:

{
  "name": "Load Test Flow",
  "idempotency_key": "{{$guid}}",
  "parameters": {
      "name": "Load Testing"
  },
  "state": {
        "type": "SCHEDULED",
        "message":"Run through API"
    }
}

I got a crash on 1/10 flows: image

Here is the debug output:

Crash details:
Traceback (most recent call last):
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/engine.py", line 946, in begin_task_run
    return await orchestrate_task_run(
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/engine.py", line 1079, in orchestrate_task_run
    state = await client.propose_state(
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/client.py", line 1770, in propose_state
    raise prefect.exceptions.Abort(response.details.reason)
prefect.exceptions.Abort: This run has already terminated.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/task_runners.py", line 316, in _run_and_store_result
    self._results[run_key] = await run_fn(**run_kwargs)
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/engine.py", line 959, in begin_task_run
    task_run.state.data._cache_data(await _retrieve_result(task_run.state))
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/results.py", line 38, in _retrieve_result
    serialized_result = await _retrieve_serialized_result(state.data)
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/client.py", line 104, in with_injected_client
    return await fn(*args, **kwargs)
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/results.py", line 34, in _retrieve_serialized_result
    return await filesystem.read_path(result.key)
  File "/opt/app-root/lib64/python3.9/site-packages/prefect/filesystems.py", line 147, in read_path
    raise ValueError(f"Path {path} does not exist.")
ValueError: Path /opt/app-root/src/.prefect/storage/fabecc0c738c4a30af048739b2660a80 does not exist.