sanic: app.add_task() triggers RuntimeWarning: coroutine was never awaited

This is likely the same problem as #1491.

Describe the bug When running code that uses app.add_task() I get a runtime warning that the coroutine that was passed to the add_task method was never awaited.

I want to make sure that this task is run once for the whole application rather than once per worker. To do that I am using a multiprocessing Lock and Value to track the state of if the task is running.

Execute the code below and wait for the output “Finished sync”. Then press ctrl-c and I see:

INFO:__main__:Finished sync
[2021-05-19 10:39:41 -0500] [976249] [INFO] Starting worker [976249]
INFO:sanic.root:Starting worker [976249]
^C[2021-05-19 10:39:57 -0500] [976247] [INFO] Received signal SIGINT. Shutting down.
INFO:sanic.root:Received signal SIGINT. Shutting down.
[2021-05-19 10:39:57 -0500] [976253] [INFO] Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976251] [INFO] Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976256] [INFO] Stopping worker [976256]
INFO:sanic.root:Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976252] [INFO] Stopping worker [976252]
INFO:sanic.root:Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976255] [INFO] Stopping worker [976255]
[2021-05-19 10:39:57 -0500] [976250] [INFO] Stopping worker [976250]
INFO:sanic.root:Stopping worker [976256]
[2021-05-19 10:39:57 -0500] [976254] [INFO] Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976249] [INFO] Stopping worker [976249]
INFO:sanic.root:Stopping worker [976252]
INFO:sanic.root:Stopping worker [976250]
INFO:sanic.root:Stopping worker [976249]
INFO:sanic.root:Stopping worker [976255]
INFO:sanic.root:Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976247] [INFO] Server Stopped
INFO:sanic.root:Server Stopped
sys:1: RuntimeWarning: coroutine 'DnsSynchronizer.start_sync' was never awaited

Code snippet

#!/usr/bin/env python3

import warnings
with warnings.catch_warnings():
    import re
    import sys
    import argparse
    import os
    import logging
    import logging.config
    import json as pjson
    from pathlib import Path
    from pprint import pprint, pformat
    from sanic import Sanic
    from sanic.response import text
    from sanic.response import json
    from sanic.response import html
    import ipaddress
    import pdb
    from typing import List, Set, Tuple
    import time

    from multiprocessing import Value, Lock
    

SCRIPT_DIR=Path(__file__).parent.absolute()

app = Sanic(__name__)
#app.static('/static', str(SCRIPT_DIR.joinpath('static_content').resolve()))

def get_logger():
    return logging.getLogger(__name__)


class DnsSynchronizer(object):
    SYNC_NEEDED = 0
    SYNC_RUNNING = 1
    SYNC_FINISHED = 2

    def __init__(self, sync_state, lock):
        self.sync_state = sync_state
        self.lock = lock
        # the sync will start in one process right away, so assume it's running
        # this variable is here to avoid a lock once the sync finishes
        self.sync_running = True
        self.execute_sync = False

        
    async def start_sync(self):
        """
        If this process is to run the sync, start it
        """
        self.lock.acquire()
        try:
            if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
                get_logger().info("Sync is needed, setting state")
                self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
            else:
                get_logger().info("Not running sync")
                return
        finally:
            self.lock.release()
        
        get_logger().info("Starting sync")
        # don't use asyncio.sleep so that this looks like a non-async task 
        time.sleep(60)

        self.lock.acquire()
        try:
            get_logger().info("Finished sync")
            self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
            self.sync_running = False
        finally:
            self.lock.release()
    
            
@app.route("/", methods=["GET","POST"])
async def update_dns(request):
    get_logger().info("Recieved request: %s", pformat(request.json))


def main_method(args):
    sync_state = Value('b', DnsSynchronizer.SYNC_NEEDED)
    lock = Lock()
    
    get_logger().info("Starting main")
    if not hasattr(app.ctx, 'dns_synchronizer'):
        app.ctx.dns_synchronizer = DnsSynchronizer(sync_state, lock)
    
    get_logger().info("adding sync task")
    app.add_task(app.ctx.dns_synchronizer.start_sync())
    get_logger().info("After adding sync task")

    get_logger().info("Starting app")
    # running with debug causes more verbose output and activates the Automatic Reloader
    app.run(host="localhost", port=args.port, debug=args.debug, access_log=False, workers=args.workers)

    
def main(argv=None):
    if argv is None:
        argv = sys.argv[1:]

    parser = argparse.ArgumentParser()
    parser.add_argument("-l", "--logconfig", dest="logconfig", help="logging configuration (default: logging.json)", default='logging.json')
    parser.add_argument("--debug", dest="debug", help="Enable interactive debugger on error", action='store_true')
    parser.add_argument("--workers", dest="workers", help="Number of worker processes", type=int, default=8)
    parser.add_argument("--port", dest="port", help="Port to run on", type=int, default=4000)

    args = parser.parse_args(argv)

    logging.basicConfig(level=logging.INFO)
    return main_method(args)


if __name__ == "__main__":
    sys.exit(main())

Expected behavior No warnings.

Environment (please complete the following information):

  • OS: Ubuntu Linux 20.04
  • Version: Sanic 21.3.4

About this issue

  • Original URL
  • State: closed
  • Created 3 years ago
  • Comments: 23 (14 by maintainers)

Commits related to this issue

Most upvoted comments

Should probably deprecate passing a coroutine object to avoid this problem. I wonder how much trouble it would cause app developers if we gave deprecation warnings or eventually removed the feature… Or could just say it in documentation.

I see no reason to deprecate it, and sometimes prefer this usage, especially when passing in some dynamic value. IMO it is a problem that can be fixed with better documentation.

@ahopkins : Sure will add it.

I think we should just add a ‘note:’ to the documentation saying. If you are adding a task using this method before app.run is called, it is recommended to use the callable after applying functools.partial if required to pass as an argument to add_task.

eg.

async def slow_work(...):
   ...

app = Sanic(...)
app.add_task(slow_work) # Note: we are passing the callable and not coroutine object `slow_task(...)` 
app.run(...)

The long explanation (given by @Tronic already), but this is just for more clarity.

If you actually call the callable, while passing to add_task, the coroutine object is created. This happens in the ‘main’ process, as the workers are created using fork they get the copy of this coroutine object (and also the loop object), but if you don’t call the callable, only the callable objects copy gets passed to the worker via fork and original callable is not made into a coroutine object in the main process.

In the workers, this task is waited upon, but in the main thread the task is not waited upon and hence one sees the warning as above. A side effect (discussed already) is that this task gets executed in ‘each’ worker. If one has to run it only on one worker or a few workers, one has to try multiprocessing synchronization mechanisms to achieve this (eg. BoundedSemaphore)

The same example above is considerably simplified using a BoundedSemaphore as follows -

    async def start_sync(self):
        """
        If this process is to run the sync, start it
        """
        concurrency = 1

        max_runs = BoundedSemaphore(value=concurrency)

        with max_runs:
            if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
                get_logger().info("Sync is needed, setting state: %d", os.getpid())
                self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
            else:
                get_logger().info("Not running sync")
                return

            get_logger().info("Starting sync: %d", os.getpid())
            time.sleep(60)
            get_logger().info("Finished sync: %d", os.getpid())

            self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
            self.sync_running = False

Using main_process_start doesn’t work as that executes the task before the workers start up. I want the workers to be executing while this task is executing.

When I used before_server_start that behaves like I want and doesn’t give the warning. It looks like the documentation for add_task needs some updating to make it clear that this needs to be executed after the loop has started.

@gatopeich Would you care to share a snippet of your solution? Maybe it’s worthwhile to add to the docs.

From the top of my head:

import multiprocess as mp

@app.listener('after_server_start')
async def run_only_once(lock=mp.Lock()):
    if lock.acquire(False):
        print("Only one worker will get this lock and leave it locked forever...")