Flask-SocketIO: Problem to run SocketIO with Celery

Hi, @miguelgrinberg first of all, congratulations for your job. I’m trying to use Socke.IO to broadcast messages to connected clients. First, I did this synchronously and worked well, exactly how I hoped. After that, I tried to do the same thing from a Celery task but it’s not possible because “working outside of application context” (how some people already reported here in other issue). So, I tried to follow the documentation (http://flask-socketio.readthedocs.io/en/latest/#using-multiple-workers).

Old code:

from flask_socketio import SocketIO, emit
from xproject.app import create_app

app = create_app()
socketio = SocketIO(app)
if __name__ == '__main__':
    socketio.run(app, debug=True, port=8081)
from flask_socketio import emit

def broadcast_crud(operation, entity_name):
    emit('entity_change', {'operation': operation, 'entity_name': entity_name}, 
        broadcast=True, namespace='/websocket')
Old console output:
* Restarting with stat
 * Debugger is active!
 * Debugger pin code: 270-121-125
(79052) wsgi starting up on http://127.0.0.1:8081
(79052) accepted ('127.0.0.1', 60479)
127.0.0.1 - - [07/Sep/2016 17:02:51] "GET /socket.io/?EIO=3&transport=polling&t=1473278571127-36 HTTP/1.1" 200 401 0.000890
127.0.0.1 - - [07/Sep/2016 17:02:51] "POST /socket.io/?EIO=3&transport=polling&t=1473278571135-37&sid=cda480af083e4a0c87ad9cc9b085bbe0 HTTP/1.1" 200 219 0.001070
(79052) accepted ('127.0.0.1', 60481)
(79052) accepted ('127.0.0.1', 60483)

New code:

from flask_socketio import SocketIO
from xproject.app import create_app

from xproject.settings import CeleryConfig

app = create_app()
socketio = socketio = SocketIO(app, message_queue=CeleryConfig.CELERY_BROKER_URL)
if __name__ == '__main__':
    socketio.run(app, debug=True, port=8081)
New console output:
 * Restarting with stat
 * Debugger is active!
 * Debugger pin code: 270-121-125
(79241) wsgi starting up on http://127.0.0.1:8081
(79241) accepted ('127.0.0.1', 60523)

When I run the new code the websocket connections stay always pending (according Chrome’s Developer Tools shows), before that, Chrome’s Developer Tools have showed status 200 on websocket connections (as you can see on server console too). The same thing happens when I try HTTP request to Rest services that also was working fine.

About Celery configuration

I configured Celery with Redis as “task’s logger” and RabbitMQ as message queue manager. See my Celery configuration:

from celery import Celery

from xproject.settings import PROJECT_APPS, CeleryConfig

celery_application = Celery('app', broker=CeleryConfig.CELERY_BROKER_URL)
celery_application.config_from_object(CeleryConfig)
celery_application.autodiscover_tasks(lambda: PROJECT_APPS)
class CeleryConfig(object):
    CELERY_BROKER_URL = 'amqp://xproject:password@localhost:5672/xprojecthost'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/0'
    CELERYD_CONCURRENCY = 5

About the configuration using this two services, my project is executing asynchronous tasks normally. I always run celery before run the my server app.

Cellery execution log:
---- **** ----- 
--- * ***  * -- Darwin-15.6.0-x86_64-i386-64bit
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         app:0x10a46ad10
- ** ---------- .> transport:   amqp://xproject:**@localhost:5672/xprojecthost
- ** ---------- .> results:     redis://localhost:6379/0
- *** --- * --- .> concurrency: 1 (prefork)
-- ******* ---- 
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery

So, I tried many changes withou any success. If you can help me, I’ll be gratefull.

Thanks.

About this issue

  • Original URL
  • State: closed
  • Created 8 years ago
  • Comments: 21 (9 by maintainers)

Most upvoted comments

@andremeirelesa uwsgi is not compatible with eventlet, you have to use gevent, but even though gevent is compatible the set up to get WebSocket to work is a bit tricky. So basically, your options are:

  • use gunicorn instead of uwsgi. Then you can still use eventlet.
  • stay with uwsgi and switch to gevent, plus uwsgi’s own websocket implementation. This should work but there is no development server option, you have to always run the application through uwsgi.

@andremeirelesa I think I know what’s happening.

It seems you are following the Using Multiple Workers section of the documentation to configure your Celery workers. This is incorrect, the multiple workers in that context refers to running multiple Socket.IO servers. In your case, it seems you have a single Socket.IO server.

Emitting from Celery is a different use case, because the Celery workers do not run Socket.IO servers, they connect to an existing server through the message queue. The part of the documentation that covers this is Emitting from an External Process.

The difference is really minimal. For example, in the Socket.IO server case you initialize the SocketIO object as follows:

socketio = SocketIO(app, message_queue='redis://')

While in the case of an external process, such as a Celery worker, you do this:

socketio = SocketIO(message_queue='redis://')

The difference is that when you initialize SocketIO as a server, you provide the Flask application instance as a first argument. This tells Socket.IO to create a server and associate it with the given application. If you do not pass an application instance, then Socket.IO just connects to the message queue, but does not start a new server, which is what you want to happen in the Celery workers.