Flask-SocketIO: Problem to run SocketIO with Celery
Hi, @miguelgrinberg first of all, congratulations for your job. I’m trying to use Socke.IO to broadcast messages to connected clients. First, I did this synchronously and worked well, exactly how I hoped. After that, I tried to do the same thing from a Celery task but it’s not possible because “working outside of application context” (how some people already reported here in other issue). So, I tried to follow the documentation (http://flask-socketio.readthedocs.io/en/latest/#using-multiple-workers).
Old code:
from flask_socketio import SocketIO, emit
from xproject.app import create_app
app = create_app()
socketio = SocketIO(app)
if __name__ == '__main__':
socketio.run(app, debug=True, port=8081)
from flask_socketio import emit
def broadcast_crud(operation, entity_name):
emit('entity_change', {'operation': operation, 'entity_name': entity_name},
broadcast=True, namespace='/websocket')
Old console output:
* Restarting with stat
* Debugger is active!
* Debugger pin code: 270-121-125
(79052) wsgi starting up on http://127.0.0.1:8081
(79052) accepted ('127.0.0.1', 60479)
127.0.0.1 - - [07/Sep/2016 17:02:51] "GET /socket.io/?EIO=3&transport=polling&t=1473278571127-36 HTTP/1.1" 200 401 0.000890
127.0.0.1 - - [07/Sep/2016 17:02:51] "POST /socket.io/?EIO=3&transport=polling&t=1473278571135-37&sid=cda480af083e4a0c87ad9cc9b085bbe0 HTTP/1.1" 200 219 0.001070
(79052) accepted ('127.0.0.1', 60481)
(79052) accepted ('127.0.0.1', 60483)
New code:
from flask_socketio import SocketIO
from xproject.app import create_app
from xproject.settings import CeleryConfig
app = create_app()
socketio = socketio = SocketIO(app, message_queue=CeleryConfig.CELERY_BROKER_URL)
if __name__ == '__main__':
socketio.run(app, debug=True, port=8081)
New console output:
* Restarting with stat
* Debugger is active!
* Debugger pin code: 270-121-125
(79241) wsgi starting up on http://127.0.0.1:8081
(79241) accepted ('127.0.0.1', 60523)
When I run the new code the websocket connections stay always pending (according Chrome’s Developer Tools shows), before that, Chrome’s Developer Tools have showed status 200 on websocket connections (as you can see on server console too). The same thing happens when I try HTTP request to Rest services that also was working fine.
About Celery configuration
I configured Celery with Redis as “task’s logger” and RabbitMQ as message queue manager. See my Celery configuration:
from celery import Celery
from xproject.settings import PROJECT_APPS, CeleryConfig
celery_application = Celery('app', broker=CeleryConfig.CELERY_BROKER_URL)
celery_application.config_from_object(CeleryConfig)
celery_application.autodiscover_tasks(lambda: PROJECT_APPS)
class CeleryConfig(object):
CELERY_BROKER_URL = 'amqp://xproject:password@localhost:5672/xprojecthost'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
CELERYD_CONCURRENCY = 5
About the configuration using this two services, my project is executing asynchronous tasks normally. I always run celery before run the my server app.
Cellery execution log:
---- **** -----
--- * *** * -- Darwin-15.6.0-x86_64-i386-64bit
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: app:0x10a46ad10
- ** ---------- .> transport: amqp://xproject:**@localhost:5672/xprojecthost
- ** ---------- .> results: redis://localhost:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
So, I tried many changes withou any success. If you can help me, I’ll be gratefull.
Thanks.
About this issue
- Original URL
- State: closed
- Created 8 years ago
- Comments: 21 (9 by maintainers)
@andremeirelesa uwsgi is not compatible with eventlet, you have to use gevent, but even though gevent is compatible the set up to get WebSocket to work is a bit tricky. So basically, your options are:
@andremeirelesa I think I know what’s happening.
It seems you are following the Using Multiple Workers section of the documentation to configure your Celery workers. This is incorrect, the multiple workers in that context refers to running multiple Socket.IO servers. In your case, it seems you have a single Socket.IO server.
Emitting from Celery is a different use case, because the Celery workers do not run Socket.IO servers, they connect to an existing server through the message queue. The part of the documentation that covers this is Emitting from an External Process.
The difference is really minimal. For example, in the Socket.IO server case you initialize the
SocketIOobject as follows:While in the case of an external process, such as a Celery worker, you do this:
The difference is that when you initialize
SocketIOas a server, you provide the Flask application instance as a first argument. This tells Socket.IO to create a server and associate it with the given application. If you do not pass an application instance, then Socket.IO just connects to the message queue, but does not start a new server, which is what you want to happen in the Celery workers.