prefect: ModuleNotFoundError when simultaneously running deployed flows using custom image

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn’t find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

I’m not 100% positive that this has anything to do with prefect-dbt however, it is the only set of flows that is experiencing this issue. I have several flows that are very simple and execute a dbt CLI command. If I run them 1 at a time, the flow loads fine and the code runs. If, however, I attempt to run 2 flows that use the prefect-dbt package, the second flow will not run and will continue to return an error similar to the below until the 1st flow is finished running.

Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/opt/prefect/weekly_tests_dbt_run.py", line 7, in <module>
    from dbt_provider import DbtProvider
  File "/opt/prefect/dbt_provider.py", line 3, in <module>
    from prefect_dbt.cli.commands import DbtCliProfile
ModuleNotFoundError: No module named 'prefect_dbt'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 260, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 182, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'weekly_tests_dbt_run.py' encountered an exception

In addition to this error, obtained from the UI, I also see the following in my cluster logs.

/usr/local/lib/python3.10/runpy.py:126: RuntimeWarning: 'prefect.engine' found in sys.modules after import of package 'prefect', but prior to execution of 'prefect.engine'; this may result in unpredictable behaviour
  warn(RuntimeWarning(msg))
08:13:56.991 | DEBUG   | prefect.client - Connecting to API at https://api.prefect.cloud/api/accounts/7381476c-4650-4f07-82e0-211613919c54/workspaces/935a408d-d2c6-4007-8777-c09196d5b90f/
/usr/local/lib/python3.10/site-packages/prefect/deployments.py:161: UserWarning: Block document has schema checksum sha256:f26ee075481470374a3084b12031692f5f2c6106556d24980bf060c5a1313470 which does not match the schema checksum for class 'Azure'. This indicates the schema has changed and this block may not load.
  storage_block = Block._from_block_document(storage_document)
08:13:57.750 | DEBUG   | Flow run 'congenial-butterfly' - Loading flow for deployment 'production-test'...
08:13:57.755 | ERROR   | Flow run 'congenial-butterfly' - Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/opt/prefect/production_test_dbt_run.py", line 7, in <module>
    from dbt_provider import DbtProvider
  File "/opt/prefect/dbt_provider.py", line 3, in <module>
    from prefect_dbt import DbtCliProfile
ModuleNotFoundError: No module named 'prefect_dbt'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 260, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 182, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'production_test_dbt_run.py' encountered an exception

Attached are 2 sample flows that execute dbt CLI commands along with the definition of the DbtProvider class. I’ve also attached my k8s yaml file. Note I’ve changed the extensions so that I can attach them and faked some IDs.

dbt_provider.txt production_test_dbt_run.txt weekly_tests_dbt_run.txt dev.txt

Reproduction

prefect version
Version:             2.7.10
API version:         0.8.4
Python version:      3.10.9
Git commit:          f269d49b
Built:               Thu, Jan 26, 2023 3:51 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Deployment Azure Kubernetes using custom image created using a Dockerfile that only has the following 2 lines:

FROM prefecthq/prefect:2.7.10-python3.10
RUN pip install adlfs psutil prefect-fivetran prefect-dbt dbt-databricks==1.3

Error

Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/opt/prefect/weekly_tests_dbt_run.py", line 7, in <module>
    from dbt_provider import DbtProvider
  File "/opt/prefect/dbt_provider.py", line 3, in <module>
    from prefect_dbt.cli.commands import DbtCliProfile
ModuleNotFoundError: No module named 'prefect_dbt'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 260, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 182, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'weekly_tests_dbt_run.py' encountered an exception

Versions

Version:             2.7.10
API version:         0.8.4
Python version:      3.10.9
Git commit:          f269d49b
Built:               Thu, Jan 26, 2023 3:51 PM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         cloud

Additional context

No response

About this issue

  • Original URL
  • State: open
  • Created a year ago
  • Comments: 23 (12 by maintainers)

Most upvoted comments

Just a note, while I can run 2 at the same time now, doing so results in the 1st one now crashing mid run…

Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 114, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 2:
  Could not run dbt
02:30:53 PM
trigger_dbt_cli_command-321ca940-1
Finished in state Failed('Task run encountered an exception: RuntimeError: Command failed with exit code 2:\n  Could not run dbt\n\n')
02:30:53 PM
trigger_dbt_cli_command-321ca940-1
Encountered exception during execution:
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 636, in orchestrate_flow_run
    result = await flow_call()
  File "/opt/prefect/production_test_dbt_run.py", line 23, in production_test_dbt_run
    await provider.run_cli_command(command)
  File "/opt/prefect/dbt_provider.py", line 56, in run_cli_command
    await trigger_dbt_cli_command(command_text, dbt_cli_profile=self.dbt_cli_profile, profiles_dir='./dbt/profiles', overwrite_profiles=True, project_dir='./dbt')
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1082, in get_task_call_return_value
    return await future._result()
  File "/usr/local/lib/python3.10/site-packages/prefect/futures.py", line 237, in _result
    return await final_state.result(raise_on_failure=raise_on_failure, fetch=True)
  File "/usr/local/lib/python3.10/site-packages/prefect/states.py", line 89, in _get_state_result
    raise await get_state_exception(state)
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 1481, in orchestrate_task_run
    result = await task.fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_dbt/cli/commands.py", line 158, in trigger_dbt_cli_command
    result = await shell_run_command.fn(command=command, **shell_run_command_kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect_shell/commands.py", line 114, in shell_run_command
    raise RuntimeError(msg)
RuntimeError: Command failed with exit code 2:
  Could not run dbt

That does appear to fix it! I was able to fire off a dozen or so simultaneous flows that access this same set of packages with no issue.

That said, it would seem this is contra to recommended approach in the documentation.

I can use this as a temporary work around but I assume that I’ll still want to continue to build my own image and remove this once the bug is identified and fixed?

One more note on the bug that I was going to add as it just started happening when I added retries (and before I pushed out your changes). If I add automatic retries w/ a 10 second delay, the first retry will fail with the ModuleNotFoundError but the second retry will again run.

Just did. Here is the flow:

import asyncio

from prefect import flow

from prefect_dbt.cli.commands import DbtCliProfile
from prefect_dbt.cli.commands import trigger_dbt_cli_command


@flow
async def production_test_dbt_run(databricks_warehouse_id:str='fadf894ad6', schema_name:str='prefect', catalog_name:str='dev'):
    pass

if __name__ == "__main__":
    asyncio.run(production_test_dbt_run())

The other one is basically the same just a different name. The result is the same:

Flow could not be retrieved from deployment.
Traceback (most recent call last):
  File "<frozen importlib._bootstrap_external>", line 883, in exec_module
  File "<frozen importlib._bootstrap>", line 241, in _call_with_frames_removed
  File "/opt/prefect/production_test_dbt_run.py", line 6, in <module>
    from prefect_dbt.cli.commands import DbtCliProfile
ModuleNotFoundError: No module named 'prefect_dbt'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/usr/local/lib/python3.10/site-packages/prefect/engine.py", line 260, in retrieve_flow_then_begin_flow_run
    flow = await load_flow_from_flow_run(flow_run, client=client)
  File "/usr/local/lib/python3.10/site-packages/prefect/client/utilities.py", line 47, in with_injected_client
    return await fn(*args, **kwargs)
  File "/usr/local/lib/python3.10/site-packages/prefect/deployments.py", line 182, in load_flow_from_flow_run
    flow = await run_sync_in_worker_thread(import_object, str(import_path))
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/asyncutils.py", line 69, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.10/site-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.10/site-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 193, in import_object
    module = load_script_as_module(script_path)
  File "/usr/local/lib/python3.10/site-packages/prefect/utilities/importtools.py", line 156, in load_script_as_module
    raise ScriptError(user_exc=exc, path=path) from exc
prefect.exceptions.ScriptError: Script at 'production_test_dbt_run.py' encountered an exception