sanic: app.add_task() triggers RuntimeWarning: coroutine was never awaited
This is likely the same problem as #1491.
Describe the bug When running code that uses app.add_task() I get a runtime warning that the coroutine that was passed to the add_task method was never awaited.
I want to make sure that this task is run once for the whole application rather than once per worker. To do that I am using a multiprocessing Lock
and Value
to track the state of if the task is running.
Execute the code below and wait for the output “Finished sync”. Then press ctrl-c and I see:
INFO:__main__:Finished sync
[2021-05-19 10:39:41 -0500] [976249] [INFO] Starting worker [976249]
INFO:sanic.root:Starting worker [976249]
^C[2021-05-19 10:39:57 -0500] [976247] [INFO] Received signal SIGINT. Shutting down.
INFO:sanic.root:Received signal SIGINT. Shutting down.
[2021-05-19 10:39:57 -0500] [976253] [INFO] Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976251] [INFO] Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976256] [INFO] Stopping worker [976256]
INFO:sanic.root:Stopping worker [976253]
[2021-05-19 10:39:57 -0500] [976252] [INFO] Stopping worker [976252]
INFO:sanic.root:Stopping worker [976251]
[2021-05-19 10:39:57 -0500] [976255] [INFO] Stopping worker [976255]
[2021-05-19 10:39:57 -0500] [976250] [INFO] Stopping worker [976250]
INFO:sanic.root:Stopping worker [976256]
[2021-05-19 10:39:57 -0500] [976254] [INFO] Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976249] [INFO] Stopping worker [976249]
INFO:sanic.root:Stopping worker [976252]
INFO:sanic.root:Stopping worker [976250]
INFO:sanic.root:Stopping worker [976249]
INFO:sanic.root:Stopping worker [976255]
INFO:sanic.root:Stopping worker [976254]
[2021-05-19 10:39:57 -0500] [976247] [INFO] Server Stopped
INFO:sanic.root:Server Stopped
sys:1: RuntimeWarning: coroutine 'DnsSynchronizer.start_sync' was never awaited
Code snippet
#!/usr/bin/env python3
import warnings
with warnings.catch_warnings():
import re
import sys
import argparse
import os
import logging
import logging.config
import json as pjson
from pathlib import Path
from pprint import pprint, pformat
from sanic import Sanic
from sanic.response import text
from sanic.response import json
from sanic.response import html
import ipaddress
import pdb
from typing import List, Set, Tuple
import time
from multiprocessing import Value, Lock
SCRIPT_DIR=Path(__file__).parent.absolute()
app = Sanic(__name__)
#app.static('/static', str(SCRIPT_DIR.joinpath('static_content').resolve()))
def get_logger():
return logging.getLogger(__name__)
class DnsSynchronizer(object):
SYNC_NEEDED = 0
SYNC_RUNNING = 1
SYNC_FINISHED = 2
def __init__(self, sync_state, lock):
self.sync_state = sync_state
self.lock = lock
# the sync will start in one process right away, so assume it's running
# this variable is here to avoid a lock once the sync finishes
self.sync_running = True
self.execute_sync = False
async def start_sync(self):
"""
If this process is to run the sync, start it
"""
self.lock.acquire()
try:
if self.sync_state.value == DnsSynchronizer.SYNC_NEEDED:
get_logger().info("Sync is needed, setting state")
self.sync_state.value = DnsSynchronizer.SYNC_RUNNING
else:
get_logger().info("Not running sync")
return
finally:
self.lock.release()
get_logger().info("Starting sync")
# don't use asyncio.sleep so that this looks like a non-async task
time.sleep(60)
self.lock.acquire()
try:
get_logger().info("Finished sync")
self.sync_state.value = DnsSynchronizer.SYNC_FINISHED
self.sync_running = False
finally:
self.lock.release()
@app.route("/", methods=["GET","POST"])
async def update_dns(request):
get_logger().info("Recieved request: %s", pformat(request.json))
def main_method(args):
sync_state = Value('b', DnsSynchronizer.SYNC_NEEDED)
lock = Lock()
get_logger().info("Starting main")
if not hasattr(app.ctx, 'dns_synchronizer'):
app.ctx.dns_synchronizer = DnsSynchronizer(sync_state, lock)
get_logger().info("adding sync task")
app.add_task(app.ctx.dns_synchronizer.start_sync())
get_logger().info("After adding sync task")
get_logger().info("Starting app")
# running with debug causes more verbose output and activates the Automatic Reloader
app.run(host="localhost", port=args.port, debug=args.debug, access_log=False, workers=args.workers)
def main(argv=None):
if argv is None:
argv = sys.argv[1:]
parser = argparse.ArgumentParser()
parser.add_argument("-l", "--logconfig", dest="logconfig", help="logging configuration (default: logging.json)", default='logging.json')
parser.add_argument("--debug", dest="debug", help="Enable interactive debugger on error", action='store_true')
parser.add_argument("--workers", dest="workers", help="Number of worker processes", type=int, default=8)
parser.add_argument("--port", dest="port", help="Port to run on", type=int, default=4000)
args = parser.parse_args(argv)
logging.basicConfig(level=logging.INFO)
return main_method(args)
if __name__ == "__main__":
sys.exit(main())
Expected behavior No warnings.
Environment (please complete the following information):
- OS: Ubuntu Linux 20.04
- Version: Sanic 21.3.4
About this issue
- Original URL
- State: closed
- Created 3 years ago
- Comments: 23 (14 by maintainers)
I see no reason to deprecate it, and sometimes prefer this usage, especially when passing in some dynamic value. IMO it is a problem that can be fixed with better documentation.
@ahopkins : Sure will add it.
I think we should just add a ‘note:’ to the documentation saying. If you are adding a task using this method before
app.run
is called, it is recommended to use the callable after applyingfunctools.partial
if required to pass as an argument toadd_task
.eg.
The long explanation (given by @Tronic already), but this is just for more clarity.
If you actually
call
the callable, while passing toadd_task
, the coroutine object is created. This happens in the ‘main’ process, as the workers are created usingfork
they get the copy of this coroutine object (and also theloop
object), but if you don’tcall
the callable, only the callable objects copy gets passed to the worker viafork
and originalcallable
is not made into a coroutine object in themain
process.In the workers, this task is waited upon, but in the main thread the task is not waited upon and hence one sees the warning as above. A side effect (discussed already) is that this task gets executed in ‘each’ worker. If one has to run it only on one worker or a few workers, one has to try multiprocessing synchronization mechanisms to achieve this (eg.
BoundedSemaphore
)The same example above is considerably simplified using a
BoundedSemaphore
as follows -Using
main_process_start
doesn’t work as that executes the task before the workers start up. I want the workers to be executing while this task is executing.When I used
before_server_start
that behaves like I want and doesn’t give the warning. It looks like the documentation foradd_task
needs some updating to make it clear that this needs to be executed after the loop has started.From the top of my head: