prefect: Long running kubernetes jobs are marked as crashed by the agent
First check
- I added a descriptive title to this issue.
- I used the GitHub search to find a similar issue and didn’t find it.
- I searched the Prefect documentation for this issue.
- I checked that this issue is related to Prefect and not one of its dependencies.
Bug summary
I have a flow that runs spark job using prefect-databricks connector. If the job is running for more than 4hrs, flow in prefect is marked as crashed after 4hrs + 1-4 minutes
Reproduction
# Kubernetes job infra defined as
k8s_job = KubernetesJob(
namespace=namespace,
image=image_name,
image_pull_policy=KubernetesImagePullPolicy.ALWAYS,
finished_job_ttl=300,
service_account_name="prefect-server",
job_watch_timeout_seconds=180,
customizations={ .. volumes & secrets here },
env={
.. bunch of envs here..
}
)
# images is custom built on top of the 2.7.4 prefect:
ARG PREFECT_VERSION=2.7.4
ARG PYTHON_VERSION=python3.10
FROM prefecthq/prefect:${PREFECT_VERSION}-${PYTHON_VERSION}
# databricks job is submitted like this:
@flow
def databricks_run_reload(notebook_path, params):
# compile job task settings
job_task_settings = JobTaskSettings(
new_cluster=new_cluster,
notebook_task=spark_python_notebook_task,
task_key=task_key,
timeout_seconds=86400,
libraries=[]
)
idempotency_key = str(uuid.uuid4())
multi_task_runs = jobs_runs_submit_and_wait_for_completion(
databricks_credentials=databricks_credentials,
run_name=run_name,
git_source=git,
tasks=[job_task_settings],
max_wait_seconds=86400,
idempotency_token=idempotency_key
)
return multi_task_runs
# and used in other flow:
@flow
# @flow(persist_result=True, result_storage=S3.load(config.DEFAULT_S3_BLOCK_NAME)) ## tried this as well, but no difference
def do_reload():
spark = databricks_run_reload(notebook_path="some_notebook", some_params)
updated = execute_dml(merge_query, params=query_params, wait_for=[spark])
Error
Traceback (most recent call last):
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 637, in orchestrate_flow_run
result = await run_sync(flow_call)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 91, in run_sync_in_worker_thread
return await anyio.to_thread.run_sync(
File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
return await get_asynclib().run_sync_in_worker_thread(
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
return await future
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
result = context.run(func, *args)
File "/opt/prefect/flows/spark.py", line 45, in do_reload
updated = execute_dml(query, params=query_params, wait_for=[spark])
File "/usr/local/lib/python3.10/site-packages/prefect/tasks.py", line 436, in __call__
return enter_task_run_engine(
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 927, in enter_task_run_engine
return run_async_from_worker_thread(begin_run)
File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 177, in run_async_from_worker_thread
return anyio.from_thread.run(call)
File "/usr/local/lib/python3.10/site-packages/anyio/from_thread.py", line 49, in run
return asynclib.run_async_from_thread(func, *args)
File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 970, in run_async_from_thread
return f.result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 458, in result
return self.__get_result()
File "/usr/local/lib/python3.10/concurrent/futures/_base.py", line 403, in __get_result
raise self._exception
File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1068, in get_task_call_return_value
return await future._result()
File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 100, in _get_state_result
raise MissingResult(
prefect.exceptions.MissingResult: State data is missing. Typically, this occurs when result persistence is disabled and the state has been retrieved from the API.
Versions
Version: 2.7.4
API version: 0.8.4
Python version: 3.10.8
Git commit: 30db76e7
Built: Thu, Dec 22, 2022 2:55 PM
OS/Arch: linux/x86_64
Profile: dev
Server type: hosted
Additional context
No response
About this issue
- Original URL
- State: closed
- Created a year ago
- Reactions: 1
- Comments: 24 (14 by maintainers)
We are presuming this is a bug with the
Watch
implementation in the Kubernetes library where it does not handle dropped connections, we’ll need to add some more robust wrapping to handle failure modes there.4hrs
is the maximum timelimit for streaming connections in kubelet which will cause the pods log loop to end even when the job/pod is running.@chrisguidry sorry, there’s been some kind of confusion on our part since we mixed up this issue with this one (that i opened myself) : https://github.com/PrefectHQ/prefect/issues/10620
i wrongly assumed it might be related to our issue but looks like it might be a separate thing. so, i’m probably not the one to ask for details on this. i’ll give the stage to whoever complained about it