prefect: Intermittent fatal error running deployment with docker infra

First check

  • I added a descriptive title to this issue.
  • I used the GitHub search to find a similar issue and didn’t find it.
  • I searched the Prefect documentation for this issue.
  • I checked that this issue is related to Prefect and not one of its dependencies.

Bug summary

We run 40-50 deployments per day and one or two of them will usually crash within the first 5 seconds with a docker error. Re-running the deployment always works. All of our deployments are docker deployments. No logs are sent to the server GUI, but they can be retrieved from the agent.

Let me know if there’s any additional information you need.

Reproduction

Since the problem is intermittent it’s difficult to put together a minimal example.

Error

08:11:55.728 | INFO    | prefect.agent - Submitting flow run '00dc9668-fb5b-4935-9bfb-0892a865d982'
08:11:56.201 | ERROR   | prefect.agent - Failed to submit flow run '00dc9668-fb5b-4935-9bfb-0892a865d982' to infrastructure.
Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/infrastructure/docker.py", line 443, in _get_client
    docker_client = docker.from_env()
AttributeError: module 'docker' has no attribute 'from_env'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/lib/python3.8/dist-packages/prefect/agent.py", line 199, in submit_run
    await self.task_group.start(submit_flow_run, flow_run, infrastructure)
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 807, in start
    return await future
  File "/usr/local/lib/python3.8/dist-packages/prefect/infrastructure/submission.py", line 48, in submit_flow_run
    return await infrastructure.run(task_status=task_status)
  File "/usr/local/lib/python3.8/dist-packages/prefect/infrastructure/docker.py", line 209, in run
    container_id = await run_sync_in_worker_thread(self._create_and_start_container)
  File "/usr/local/lib/python3.8/dist-packages/prefect/utilities/asyncutils.py", line 56, in run_sync_in_worker_thread
    return await anyio.to_thread.run_sync(call, cancellable=True)
  File "/usr/local/lib/python3.8/dist-packages/anyio/to_thread.py", line 31, in run_sync
    return await get_asynclib().run_sync_in_worker_thread(
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 937, in run_sync_in_worker_thread
    return await future
  File "/usr/local/lib/python3.8/dist-packages/anyio/_backends/_asyncio.py", line 867, in run
    result = context.run(func, *args)
  File "/usr/local/lib/python3.8/dist-packages/prefect/infrastructure/docker.py", line 245, in _create_and_start_container
    docker_client = self._get_client()
  File "/usr/local/lib/python3.8/dist-packages/prefect/infrastructure/docker.py", line 445, in _get_client
    except docker.errors.DockerException as exc:
AttributeError: module 'docker' has no attribute 'errors'

Versions

Version:             2.1.1
API version:         0.8.0
Python version:      3.8.10
Git commit:          dc2ba222
Built:               Thu, Aug 18, 2022 10:18 AM
OS/Arch:             linux/x86_64
Profile:             default
Server type:         ephemeral
Server:
  Database:          sqlite
  SQLite version:    3.31.1

Additional context

Here’s the deployment file, in case it’s of any help, with certain strings removed.

###
### A complete description of a Prefect Deployment for flow 'RA Integration'
###
name: RA Integration (prod)
description: Flow for the Reporting Analytics (RA) integration.
version: fbd05955547891a8bc78436ca1598ffd
# The work queue that will handle this deployment's runs
work_queue_name: integration-prod
tags:
- prod
- integration
parameters: {}
schedule: null
infra_overrides:
  image: REMOVED
infrastructure:
  type: docker-container
  env: {}
  labels: {}
  name: null
  command:
  - python
  - -m
  - prefect.engine
  image: prefecthq/prefect:2.1.1-python3.8
  image_pull_policy: null
  image_registry: null
  networks: []
  network_mode: null
  auto_remove: false
  volumes: []
  stream_output: true

###
### DO NOT EDIT BELOW THIS LINE
###
flow_name: RA Integration
manifest_path: null
storage:
  bucket_path: flows/ra_integration
  azure_storage_connection_string: REMOVED
  azure_storage_account_name: null
  azure_storage_account_key: null
  _is_anonymous: true
  _block_document_name: REMOVED
  _block_document_id: REMOVED
  _block_type_slug: azure
path: null
entrypoint: orchestration/flows/ra_integration.py:default
parameter_openapi_schema:
  title: Parameters
  type: object
  properties:
    loglevel:
      title: loglevel
      default: INFO
      type: string
    integration_parameters:
      title: integration_parameters
      default:
        namespace:
        - LOCATION_ACTIVITY_DB
        - COREDB
        backfill: false
        manual stream select: null
        manual delta key override: null
        manual delta range override:
          lower-bound: null
          upper-bound: null
  required: null
  definitions: null

About this issue

  • Original URL
  • State: open
  • Created 2 years ago
  • Reactions: 3
  • Comments: 25 (14 by maintainers)

Most upvoted comments

I keep getting the message “State message: Flow run infrastructure exited with non-zero status code 1.” when I’m running on docker. It’s intermittent, and it only happens a few times, but as I run quite a few flows a day, it’s becoming a problem for me. It happens with any flow at any time, there is no pattern, I need help…

I found it pretty much a drop-in for agents. I had our agents already updated to queues, pools, etc., when I did it on Prefect 2.10.6.

Only had a minor issue with env. vars, where they stopped supporting a specific naming scheme (https://github.com/PrefectHQ/prefect/issues/9397).

Probably still wise to attempt it on a test setup beforehand 😃

I’ve managed to remedy this problem by adding import docker in the module that contains my @task that is run concurrently (with my_task.map(input_values) or my_task.submit(input_value)). Looking forward to a more permanent fix 😃

Any updates on the status of the fix? We’re running Prefect 2.9.0 and still see a lot of these issues (especially at times with high load on the Prefect database running in a self-hosted setup - not sure how causal that link is, but the correlation is quite visible)

Thanks for all these additional details! Looks like there is definitely some sort of concurrency issue with the lazy loader. We’re going to need to create a MRE that use uses lazy loading of the Docker module to isolate this from all of the other mechanisms. Something like…

import anyio
from prefect.utilities.importtools import lazy_import

docker = lazy_import("docker")

async def load_attribute():
    docker.errors

async def main():
    async with anyio.create_task_group() as tg:
         for _ in range(20):
             tg.start_soon(load_attribute)

anyio.run(main)

Once we’ve reproduced the issue in this isolated context we can investigate a fix. I’m a bit confused after looking at our implementation as it seems like it should be robust to concurrency.