prefect: Long running kubernetes jobs are marked as crashed by the agent

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn’t find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

I have a flow that runs spark job using prefect-databricks connector. If the job is running for more than 4hrs, flow in prefect is marked as crashed after 4hrs + 1-4 minutes

Reproduction

# Kubernetes job infra defined as

k8s_job = KubernetesJob(
        namespace=namespace,
        image=image_name,
        image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
        finished_job_ttl=300,
        service_account_name="prefect-server",
        job_watch_timeout_seconds=180,
        customizations={ .. volumes & secrets here },
        env={
            .. bunch of envs here.. 
        }
    )

# images is custom built on top of the 2.7.4 prefect:

ARG PREFECT_VERSION=2.7.4
ARG PYTHON_VERSION=python3.10
FROM prefecthq/prefect:${PREFECT_VERSION}-${PYTHON_VERSION}


# databricks job is submitted like this:

@flow
def databricks_run_reload(notebook_path, params):

    # compile job task settings
    job_task_settings = JobTaskSettings(
        new_cluster=new_cluster,
        notebook_task=spark_python_notebook_task,
        task_key=task_key,
        timeout_seconds=86400,
        libraries=[]
    )

    idempotency_key = str(uuid.uuid4())
    multi_task_runs = jobs_runs_submit_and_wait_for_completion(
        databricks_credentials=databricks_credentials,
        run_name=run_name,
        git_source=git,
        tasks=[job_task_settings],
        max_wait_seconds=86400,
        idempotency_token=idempotency_key
    )
    return multi_task_runs

# and used in other flow:

@flow
# @flow(persist_result=True, result_storage=S3.load(config.DEFAULT_S3_BLOCK_NAME)) ## tried this as well, but no difference
def do_reload():
    spark = databricks_run_reload(notebook_path="some_notebook", some_params)
    updated = execute_dml(merge_query, params=query_params, wait_for=[spark])

Error

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
    result = await run_sync(flow_call)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/opt/prefect/flows/spark.py", line 45, in do_reload
    updated = execute_dml(query, params=query_params, wait_for=[spark])
  File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 436, in __call__
    return enter_task_run_engine(
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 927, in enter_task_run_engine
    return run_async_from_worker_thread(begin_run)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
    return anyio.from_thread.run(call)
  File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
    return asynclib.run_async_from_thread(func, *args)
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
    return f.result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
    return self.__get_result()
  File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1068, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 100, in _get_state_result
    raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.

Versions

Version:             2.7.4
API version:         0.8.4
Python version:      3.10.8
Git commit:          30db76e7
Built:               Thu, Dec 22, 2022 2:55 PM
OS/Arch:             linux/x86_64
Profile:             dev
Server type:         hosted

Additional context

No response

About this issue

  • Original URL
  • State: closed
  • Created a year ago
  • Reactions: 1
  • Comments: 24 (14 by maintainers)

Most upvoted comments

We are presuming this is a bug with the Watch implementation in the Kubernetes library where it does not handle dropped connections, we’ll need to add some more robust wrapping to handle failure modes there.

4hrs is the maximum timelimit for streaming connections in kubelet which will cause the pods log loop to end even when the job/pod is running.

@chrisguidry sorry, there’s been some kind of confusion on our part since we mixed up this issue with this one (that i opened myself) : https://github.com/PrefectHQ/prefect/issues/10620

i wrongly assumed it might be related to our issue but looks like it might be a separate thing. so, i’m probably not the one to ask for details on this. i’ll give the stage to whoever complained about it